Home Tags Posts tagged with "并发"

并发

对于大部分程序员来说,如果你学习的技术,并不能转换为生产工具,那就毫无意义。因此,在学习并发编程之前,我们有必要了解

什么是并发(转载)

作者:大宽宽
链接:https://www.zhihu.com/question/307100151/answer/894486042
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

深度好文!!!值得推荐!!!

【并发】(Concurrency)是由【P进程】引申出来的抽象概念。

上面说到了你可以假设自己一个人按照一定的步骤来铺路,一个人从头干到尾,这是一个“串行”的【P进程】。

但你也可以假设有2个人铺路。比如你可以按照长度分两半,一人铺500m * 50m;也可以按宽度划分,一人铺1000m * 25m;你还可以说让一个人负责铺全部路面的前5个步骤,另外一个人负责铺路面的余下5个步骤。然后你可以进一步想,假如不是雇2个人,而是雇20个人概如何分工呢?你可以混搭按长度,宽度,步骤等各种方式进行拆分。你甚至可以考虑这20个人不是完全一样的,有的能力强,有的能力弱,可以适当的调整工作量的比例等等。

不管怎样拆,都意味着你得到了【并发】的【P进程】。换成说人话就是,你有一套方案,可以让多个人一起把事情做的更高效。注意是“可以“让事情更高效,而不是“必然“让事情更高效。是不是更高效要看到底是怎么执行的,后边会讲。

举个写代码的例子,你有一个很长很长的数组,目标是把每一个数都*2。一个并发的做法就是把数组拆为很多个小段,然后每个小段的元素依次自己*2。这样的程序写出来就是一个【并发】的【程序】。这个程序如果运行起来就是【并发】的【OS进程】。

这时就会出现一个问题,当你想把一个【并发】的【P进程】写成程序时,你怎么用编程语言告诉操作系统你的程序的一些步骤是【并发】的。更确切地说,你需要一个写法(可能是语法,也可能是函数库)表达:

  • 几个任务是【并发】的
  • 【并发】的任务之间是怎么交互协作的

为了解决这两个问题,人们总结了一些方法,并将其称为“并发模型”。

比如:

  • Fork & Join模型(大任务拆解为小任务并发的跑,结果再拼起来)
  • Actor模型(干活的步骤之间直接发消息)
  • CSP模型(干活的步骤之间订阅通话的频道来协作)
  • 线程&锁模型(干活的人共享一个小本本,用来协作。注意小本本不能改乱套了,所以得加锁)
  • ……

以Java中的线程为例,大家想表达【并发】就启动新的Thread(或者某种等价操作,如利用线程池);想让Thread之间交互,就要依靠共享内容。但是【并发】的Thread如果同时修改同一份数据就有可能出错(被称为竞争问题),为了解决这个问题就要引入锁(Lock,或者一些高级的同步工具,如CountdownLatch,Semaphore)。

特别强调下,Java的线程是表达并发的概念的类。这个类在绝大部分操作系统上使用操作系统内核中的【线程】实现。二者之间还是有一些细微的差异。即用开发者用Java Thread写代码表达思路,和操作系统调度线程执行是两个层面的事情。请努力认识到这一点。

再比如Erlang是基于Actor的并发模型(其实这是原教旨主义的OO)。那么就是每个参与【并发】的任务称为Process(又一个进程……,和【P进程】以及【OS进程都不太一样】,叫【E进程】好了,Erlang中的”进程“)。【E进程】之间通过消息来协作。每个【E进程】要不是在处理消息,要不就是在等新的消息。

如果你用go,那么表达并发的工具就是goroutine,goroutine之间协作要用channel。(当然也可以用Sync包加锁,不展开)。

对于并发模型《7周7并发模型》这本书讲的非常好。推荐阅读。书中展示了七种最经典的并发模型和大量的编码实例。

 

为啥要并发

把事情设计为【并发】有什么好处呢?假如能同时干活的人只有1个,其实并没有什么好处。【并发】的方法的总耗时总会>=串行的方法。因为【并发】或多或少总会引入需要协作和沟通成本。最小的代价就是不需要沟通,此时【并发】的方法和串行的方法工作量是一样的。

但是【并发】的巨大优势是在可以干活的人数量变多时,马上得到【并行】的好处。假如我们可以得到一个【并发】的【P进程】,并且真的为其配备足够多的人,那么做事的效率就会高很多。回到软件系统,假如有一个【并发】的【程序】,它在只有1个CPU的核心的机器上可以跑,在2个的CPU的也可以跑,在4核CPU上也可以跑。物理上可用CPU核心越多,程序能够越快执行完。而不管在哪里跑,程序本身不用做变化。编程是一件成本很高的事,能够做到程序不变而适应各种环境,可以极大的降低开发成本。你能想象下为1核心CPU开发的Office软件和4核心的不一样吗?

 

并发和并行的关系是什么

【并发】(Concurrency)这个词的本意是指两件事没有谁先谁后的关系,或者说关系不确定。举个通俗的例子,自然数任何两个数字都可以比较大小。我们可以明确地说5 > 3。但是如果换一个领域,并不是任何两个元素都有明确的顺序关系,或者说“谁在前面谁在后面都是可以的“。

对于任务执行这个领域,对于两个任务A和B,如果我们说他们俩是【并发】的,这就要求不能在任务B里使用A的结果,也不能让A执行时使用B的结果。因此在执行层面,A可以在B之前执行,也可以在B执行,或者A和B交替执行,或者A和B【并行】的执行。不管执行层面怎么折腾,结果都是对的。

反过来,如果A的执行需要B的结果,那也就意味着A和B不是【并发】的,必须让B先执行完,A才可以开始。在实现层面,就可以用加锁、channel等方式来表达“先B后A”。

Rob Pike在一个Talk里(blog.golang.org/concurr)提到了很重要的两个观点:

  • Concurrency is not Parallelism
  • Concurrency enables parallelism & makes parallelism (and scaling and everything else) easy

前一个观点【并发】和【并行】不是一件事,我们都可以理解了。【并发】说的是处理(Deal)的方法;【并行】说的是执行(Execution)的方法。

后一个观点指的是,如果想让一个事情变得容易【并行】,先得让制定一个【并发】的方法。倘若一个事情压根就没有【并发】的方法,那么无论有多少个可以干活的人,也不能【并行】。比如你让20个人不铺路,而是一起去拧同一个灯泡,也只能有一个人踩在梯子上去拧,其他19个人只能看着,啥也干不了。

对于一个问题,能不能找到【并发】的办法,取决于问题本身。有些问题很容易【并发】,有些问题可以一部分【并发】其余的串行(比如对数组排序就是,无论怎么拆,最终也要把每个拆开的问题结果合并到一起再排序才行),有些问题则根本上就不能【并发】。找不到【并发】的方法也就意味着不管有多少CPU核心,也没法【并行】执行。

换一个极端,假如为最多20个人设计了【并发】的方法,结果来了40个人,就意味着40人里有20个人是闲着的,是浪费。也就是说【并行】的上限是由【并发】的方法的设计决定的。这就解释了你吃鸡的时候,4核CPU和8核差别不大,因为这个游戏压根就没设计成可以利用这么多个CPU核心。(BTW,但游戏被设计为能充分利用显卡的多核心)

其实上面只是将CPU核心当作是“做事的人“,再广义一点,比如显卡,网卡,磁盘都是独立的可以干活的人。这些组件之间也可以并行的跑。因此,在设计程序的时候,可以比如把计算和IO任务拆开设计一个【并发】的方法,然后利用CPU和网卡是两个零件来【并行】的跑

常见的误解

你可能看到过下面的论断:

并发是多个任务交替使用CPU,同一时刻只有一个任务在跑;并行是多个任务同时跑

这个理解不能说全错,但是合到一起就形成了错误的理解。这个错误的理解就是:并发和并行是两个并列的,非此即彼的概念,一个状态要不就是并行的,要不就是并发的。这是完全错误的,实际上看到上面的解释你就会发现【并发】和【并行】描述的是两个频道的事情。正如Rob Pike所言,一个是“如何处理”,一个是“如何执行”。因此,对于:

并发是多个任务交替使用CPU,同一时刻只有一个任务在跑

其实正确的理解是:针对一个问题,想到了一个可以拆解为多个【并发】的任务,这些任务执行时因为只有一个CPU只能“切换”的跑。

对于:

并行是多个任务同时跑

其实的意思是:如果这些并行执行的任务是解决同一个问题的,那么他们既是【并发】的,同时也是【并行】的。

那么可不可以做到只【并行】,而不【并发】呢?当然可以,但这也就意味着【并行】的程序之间没有什么关联,各干各的,就像大街上来来往往的陌生人一样。这的确是【并行】,并且是这个世界的常态。但是一群不认识的,各干各的人是不能一起解决问题的,要一起就得有同一个目标,制定一套沟通的方法,形成【并发】的方案。这种形式在现实当中就是“公司”。

0 104

在 JDK1.2 之前Java 的内存模型实现总是从主存(即共享内存)读取变量,是不需要进行特别的注意的。而在当前的 Java 内存模型下线程可以把变量保存本地内存(比如机器的寄存器)中,而不是直接在主存中进行读写。这就可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成数据的不一致JMM(Java内存模型)

要解决这个问题,就需要把变量声明为**volatile**,这就指示 JVM,这个变量是共享且不稳定的每次使用它都到主存中进行读取

所以,volatile 关键字 除了防止 JVM 的指令重排 ,还有一个重要的作用就是保证变量的可见性

volatile关键字的可见性

2.3. 并发编程的三个重要特性

  1. 原子 : 一个的操作或者多次操作,要么所有的操作全部都得到执行并且不会收到任何因素的干扰而中断,要么所有的操作都执行,要么都不执行。synchronized 可以保证代码片段的原子性。
  2. 可见 :当一个变量对共享变量进行了修改,那么另外的线程都是立即可以看到修改后的最新值。volatile 关键字可以保证共享变量的可见性。
  3. 有序 :代码在执行的过程中的先后顺序,Java 在编译器以及运行期间的优化,代码的执行顺序未必就是编写代码时候的顺序。volatile 关键字可以禁止指令进行重排序优化。(思考指令重排的原理和作用)

2.4. 说说 synchronized 关键字和 volatile 关键字的区别

synchronized 关键字和 volatile 关键字是两个互补的存在,而不是对立的存在!

  • volatile 关键字是线程同步的轻量级实现,所以**volatile 性能肯定比synchronized关键字要好**。但是**volatile 关键字只能用于变量而 synchronized 关键字可以修饰方法以及代码块**。
  • volatile 关键字能保证数据的可见性,但不能保证数据的原子性。synchronized 关键字两者都能保证。
  • volatile关键字主要用于解决变量在多个线程之间的可见性,而 synchronized 关键字解决的是多个线程之间访问资源的同步性。

 

3. ThreadLocal

3.1. ThreadLocal 简介

通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。如果想实现每一个线程都有自己的专属本地变量该如何解决呢? JDK 中提供的ThreadLocal类正是为了解决这样的问题。 ThreadLocal类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。

如果你创建了一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的本地副本,这也是ThreadLocal变量名的由来。他们可以使用 get() 和 set() 方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。

再举个简单的例子:

比如有两个人去宝屋收集宝物,这两个共用一个袋子的话肯定会产生争执,但是给他们两个人每个人分配一个袋子的话就不会出现这样的问题。如果把这两个人比作线程的话,那么 ThreadLocal 就是用来避免这两个线程竞争的。

3.2. ThreadLocal 示例

import java.text.SimpleDateFormat;
import java.util.Random;

public class ThreadLocalExample implements Runnable{

     // SimpleDateFormat 不是线程安全的,所以每个线程都要有自己独立的副本
    private static final ThreadLocal<SimpleDateFormat> formatter = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd HHmm"));

    public static void main(String[] args) throws InterruptedException {
        ThreadLocalExample obj = new ThreadLocalExample();
        for(int i=0 ; i<10; i++){
            Thread t = new Thread(obj, ""+i);
            Thread.sleep(new Random().nextInt(1000));
            t.start();
        }
    }

    @Override
    public void run() {
        System.out.println("Thread Name= "+Thread.currentThread().getName()+" default Formatter = "+formatter.get().toPattern());
        try {
            Thread.sleep(new Random().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //formatter pattern is changed here by thread, but it won't reflect to other threads
        formatter.set(new SimpleDateFormat());

        System.out.println("Thread Name= "+Thread.currentThread().getName()+" formatter = "+formatter.get().toPattern());
    }

}

从输出中可以看出,Thread-0 已经改变了 formatter 的值,但仍然是 thread-2 默认格式化程序与初始化值相同,其他线程也一样。

上面有一段代码用到了创建 ThreadLocal 变量的那段代码用到了 Java8 的知识,它等于下面这段代码,如果你写了下面这段代码的话,IDEA 会提示你转换为 Java8 的格式(IDEA 真的不错!)。因为 ThreadLocal 类在 Java 8 中扩展,使用一个新的方法withInitial(),将 Supplier 功能接口作为参数。

private static final ThreadLocal<SimpleDateFormat> formatter = new ThreadLocal<SimpleDateFormat>(){
        @Override
        protected SimpleDateFormat initialValue()
        {
            return new SimpleDateFormat("yyyyMMdd HHmm");
        }
    };

3.3. ThreadLocal 原理

从 Thread类源代码入手。

public class Thread implements Runnable {
 ......
//与此线程有关的ThreadLocal值。由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;

//与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
 ......
}

从上面Thread类 源代码可以看出Thread 类中有一个 threadLocals 和 一个 inheritableThreadLocals 变量,它们都是 ThreadLocalMap 类型的变量,我们可以把 ThreadLocalMap 理解为ThreadLocal 类实现的定制化的 HashMap。默认情况下这两个变量都是 null,只有当前线程调用 ThreadLocal 类的 setget方法时才创建它们,实际上调用这两个方法的时候,我们调用的是ThreadLocalMap类对应的 get()set()方法。

ThreadLocal类的set()方法

public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

通过上面这些内容,我们足以通过猜测得出结论:最终的变量是放在了当前线程的 ThreadLocalMap 中,并不是存在 ThreadLocal 上,ThreadLocal 可以理解为只是ThreadLocalMap的封装,传递了变量值。ThrealLocal 类中可以通过Thread.currentThread()获取到当前线程对象后,直接通过getMap(Thread t)可以访问到该线程的ThreadLocalMap对象。

每个Thread中都具备一个ThreadLocalMap,而ThreadLocalMap可以存储以ThreadLocal为 key ,Object 对象为 value 的键值对。

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
 ......
}

比如我们在同一个线程中声明了两个 ThreadLocal 对象的话,会使用 Thread内部都是使用仅有那个ThreadLocalMap 存放数据的,ThreadLocalMap的 key 就是 ThreadLocal对象,value 就是 ThreadLocal 对象调用set方法设置的值。

ThreadLocalMapThreadLocal的静态内部类。

 

3.4. ThreadLocal 内存泄露问题

ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,而 value 是强引用。所以,如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。这样一来,ThreadLocalMap 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。ThreadLocalMap 实现中已经考虑了这种情况,在调用 set()get()remove() 方法的时候,会清理掉 key 为 null 的记录。使用完 ThreadLocal方法后 最好手动调用remove()方法

      static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

弱引用介绍:

如果一个对象只具有弱引用,那就类似于可有可无的生活用品。弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它 所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程, 因此不一定会很快发现那些只具有弱引用的对象。

弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java 虚拟机就会把这个弱引用加入到与之关联的引用队列中。

4. 线程池

4.1. 为什么要用线程池?

池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

4.2. 实现 Runnable 接口和 Callable 接口的区别

Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。Runnable 接口不会返回结果或抛出检查异常,但是**Callable 接口**可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。

工具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

Runnable.java

@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值也无法抛出异常
    */
    public abstract void run();
}

Callable.java

@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

4.3. 执行 execute()方法和 submit()方法的区别是什么呢?

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

我们以**AbstractExecutorService**接口中的一个 submit 方法为例子来看看源代码:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

上面方法调用的 newTaskFor 方法返回了一个 FutureTask 对象。

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

我们再来看看execute()方法:

public void execute(Runnable command) {
      ...
    }


4.4. 如何创建线程池

《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors 返回线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

方式一:通过构造方法实现ThreadPoolExecutor构造方法方式二:通过 Executor 框架的工具类 Executors 来实现 我们可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

对应 Executors 工具类中的方法如图所示: Executor框架的工具类

4.5 ThreadPoolExecutor 类分析

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。

/**
     * 用给定的初始参数创建一个新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

下面这些对创建 非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。

4.5.1 ThreadPoolExecutor构造函数重要参数分析

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数:

  1. keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  2. unit : keepAliveTime 参数的时间单位。
  3. threadFactory :executor 创建新线程的时候会用到。
  4. handler :饱和策略。关于饱和策略下面单独介绍一下。

4.5.2 ThreadPoolExecutor 饱和策略

ThreadPoolExecutor 饱和策略定义:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy 此策略将丢弃最早的未处理的任务请求。

举个例子: Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了)

4.6 一个简单的线程池 Demo

为了让大家更清楚上面的面试题中的一些概念,我写了一个简单的线程池 Demo。

首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们上面也说了两者的区别。)

MyRunnable.java

import java.util.Date;

/**
 * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
 * @author shuang.kou
 */
public class MyRunnable implements Runnable {

    private String command;

    public MyRunnable(String s) {
        this.command = s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.command;
    }
}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

ThreadPoolExecutorDemo.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

可以看到我们上面的代码指定了:

  1. corePoolSize: 核心线程数为 5。
  2. maximumPoolSize :最大线程数 10
  3. keepAliveTime : 等待时间为 1L。
  4. unit: 等待时间的单位为 TimeUnit.SECONDS。
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  6. handler:饱和策略为 CallerRunsPolicy

Output:

pool-1-thread-2 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-5 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-4 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-1 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-3 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-5 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-3 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-4 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-1 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-1 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-4 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-3 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-5 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-3 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-4 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-5 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-1 End. Time = Tue Nov 12 20:59:54 CST 2019

4.7 线程池原理分析

承接 4.6 节,我们通过代码输出结果可以看出:线程池每次会同时执行 5 个任务,这 5 个任务执行完之后,剩余的 5 个任务才会被执行。 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)

现在,我们就分析上面的输出内容来简单分析一下线程池原理。

**为了搞懂线程池的原理,我们需要首先分析一下 execute方法。**在 4.6 节中的 Demo 中我们使用 executor.execute(worker)来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:

   // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }

    private final BlockingQueue<Runnable> workQueue;

    public void execute(Runnable command) {
        // 如果任务为null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        // ctl 中保存的线程池当前的一些状态信息
        int c = ctl.get();

        //  下面会涉及到 3 步 操作
        // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
        // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
        // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果当前线程池为空就新创建一个线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }

通过下图可以更好的对上面这 3 步做一个展示,下图是我为了省事直接从网上找到,原地址不明。

图解线程池实现原理

现在,让我们在回到 4.6 节我们写的 Demo, 现在应该是不是很容易就可以搞懂它的原理了呢?

没搞懂的话,也没关系,可以看看我的分析:

我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务执行完成后,才会执行剩下的 5 个任务。

0 109

CAS及ABA的定义和实现

CAS思想(算法):对于内存中的某一个值data,提供一个旧值A一个新值B。如果提供的旧值V和A相等就把B写入V,这个过程是原子性的。

CAS执行结果要么成功要么失败,对于失败的情形下一般采用不断重试,或者放弃。

举例说明:

public class CAS{
    public static void main(String[] args) {
        /*乐观锁的实现CAS;*/
        String data = "123";//共享数据

        /*更新数据的线程会进行如下操作*/
        boolean flag=true;
        while(flag){
            oldValue = date;//保存原始数据
            newValue = doSomething(oldValue);
            //下面的部分为CAS操作,尝试更新出data值
            if(date == oldValue){//比较在更新之前原值有无更改,如没有
                //则更新:
                data = newValue;
                falg = false;//跳出循环
            }
            else{
                //什么也不做,循环重试
            }
        }
    }
}
/*
   很明显,这样的代码根本不是原子性的,
   因为真正的CAS利用了CPU指令,
   这里只是为了展示执行流程,本意是一样的。
*/

ABA问题

CAS实现的过程是先取出内存中某时刻的数据在下一时刻比较并替换,那么在这个时间差会导致数据的变化,此时就有可能出现ABA问题;

什么是ABA问题?

如果另一个线程修改V值,假设原来是A,先修改成B,再修改会A。当前线程的CAS操作无法分辨当前V值是否发生过变化。

举例说明:

在你非常渴的情况下发现一个盛满水的被子,你一饮而尽,之后再给杯子里面重新倒满水,然后你离开,当杯子的真正主人(当前执行CAS操作的线程)看到杯子还是盛满水的,他当然不知道是否被人喝完重新倒满。解决这个问题的方案的一个策略是 每一次倒水假设有一个自动记录仪记录下(Version版本号),这样主人回来就可以分辨在他离开后是否发生过重新倒满的情况。这也是解决ABA问题目前采用的策略。

解决办法:

1.加版本号

2.用AtomicStampedReference/AtomicMarkableReference解决ABA问题

 

补充:

洪山区最新新开了一家自助餐店,为迎接5-1劳动节,老板决定为每位消费卡余额低于20的用户卡里赠送20元,该活动每位顾客只可享受一次;

很简单就用cas技术,先去用户卡里的余额,然后包装成 AtomicInteger,写一个判断,开启10个线程,然后判断小于20的,一律加20,然后就很开心的交差了。可是过了一段时间,发现账面亏损的厉害,老板起先的预支是2000块,因为店里的会员总共也就100多个,就算每人都符合条件,最多也就2000啊,怎么预支了这么多。小王一下就懵逼了,赶紧debug,tail -f一下日志,这不看不知道,一看吓一跳,有个客户被充值了10次!

解释:

用户要给储值卡充值20元,用户卡内余额15元,A线程首先获取余额是15,然后准备加上20,A线程因为某种原因block超时,系统超时重试提交,B线程成功将余额加上20并且成功提交,此时余额为35。但是紧接着用户又消费了20,所以余额还是15,终于A线程获取到了时间片,它比对之后发现余额还是15,所以A线程就执行了。ABA的问题核心在于一个线程在提交的时候,如果只是根据要修改的值和之前是否一样,这样是无法证明这个值没有被其他线程改过因为在这段时间它的值可能被改为其他值,然后又改原来的,实际上如果避免重复提交就能避免ABA问题,而版本号控制可以避免重复提交