Home

0 11

在 JDK1.2 之前Java 的内存模型实现总是从主存(即共享内存)读取变量,是不需要进行特别的注意的。而在当前的 Java 内存模型下线程可以把变量保存本地内存(比如机器的寄存器)中,而不是直接在主存中进行读写。这就可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成数据的不一致JMM(Java内存模型)

要解决这个问题,就需要把变量声明为**volatile**,这就指示 JVM,这个变量是共享且不稳定的每次使用它都到主存中进行读取

所以,volatile 关键字 除了防止 JVM 的指令重排 ,还有一个重要的作用就是保证变量的可见性

volatile关键字的可见性

2.3. 并发编程的三个重要特性

  1. 原子 : 一个的操作或者多次操作,要么所有的操作全部都得到执行并且不会收到任何因素的干扰而中断,要么所有的操作都执行,要么都不执行。synchronized 可以保证代码片段的原子性。
  2. 可见 :当一个变量对共享变量进行了修改,那么另外的线程都是立即可以看到修改后的最新值。volatile 关键字可以保证共享变量的可见性。
  3. 有序 :代码在执行的过程中的先后顺序,Java 在编译器以及运行期间的优化,代码的执行顺序未必就是编写代码时候的顺序。volatile 关键字可以禁止指令进行重排序优化。(思考指令重排的原理和作用)

2.4. 说说 synchronized 关键字和 volatile 关键字的区别

synchronized 关键字和 volatile 关键字是两个互补的存在,而不是对立的存在!

  • volatile 关键字是线程同步的轻量级实现,所以**volatile 性能肯定比synchronized关键字要好**。但是**volatile 关键字只能用于变量而 synchronized 关键字可以修饰方法以及代码块**。
  • volatile 关键字能保证数据的可见性,但不能保证数据的原子性。synchronized 关键字两者都能保证。
  • volatile关键字主要用于解决变量在多个线程之间的可见性,而 synchronized 关键字解决的是多个线程之间访问资源的同步性。

 

3. ThreadLocal

3.1. ThreadLocal 简介

通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。如果想实现每一个线程都有自己的专属本地变量该如何解决呢? JDK 中提供的ThreadLocal类正是为了解决这样的问题。 ThreadLocal类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。

如果你创建了一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的本地副本,这也是ThreadLocal变量名的由来。他们可以使用 get() 和 set() 方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。

再举个简单的例子:

比如有两个人去宝屋收集宝物,这两个共用一个袋子的话肯定会产生争执,但是给他们两个人每个人分配一个袋子的话就不会出现这样的问题。如果把这两个人比作线程的话,那么 ThreadLocal 就是用来避免这两个线程竞争的。

3.2. ThreadLocal 示例

import java.text.SimpleDateFormat;
import java.util.Random;

public class ThreadLocalExample implements Runnable{

     // SimpleDateFormat 不是线程安全的,所以每个线程都要有自己独立的副本
    private static final ThreadLocal<SimpleDateFormat> formatter = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd HHmm"));

    public static void main(String[] args) throws InterruptedException {
        ThreadLocalExample obj = new ThreadLocalExample();
        for(int i=0 ; i<10; i++){
            Thread t = new Thread(obj, ""+i);
            Thread.sleep(new Random().nextInt(1000));
            t.start();
        }
    }

    @Override
    public void run() {
        System.out.println("Thread Name= "+Thread.currentThread().getName()+" default Formatter = "+formatter.get().toPattern());
        try {
            Thread.sleep(new Random().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //formatter pattern is changed here by thread, but it won't reflect to other threads
        formatter.set(new SimpleDateFormat());

        System.out.println("Thread Name= "+Thread.currentThread().getName()+" formatter = "+formatter.get().toPattern());
    }

}

从输出中可以看出,Thread-0 已经改变了 formatter 的值,但仍然是 thread-2 默认格式化程序与初始化值相同,其他线程也一样。

上面有一段代码用到了创建 ThreadLocal 变量的那段代码用到了 Java8 的知识,它等于下面这段代码,如果你写了下面这段代码的话,IDEA 会提示你转换为 Java8 的格式(IDEA 真的不错!)。因为 ThreadLocal 类在 Java 8 中扩展,使用一个新的方法withInitial(),将 Supplier 功能接口作为参数。

private static final ThreadLocal<SimpleDateFormat> formatter = new ThreadLocal<SimpleDateFormat>(){
        @Override
        protected SimpleDateFormat initialValue()
        {
            return new SimpleDateFormat("yyyyMMdd HHmm");
        }
    };

3.3. ThreadLocal 原理

从 Thread类源代码入手。

public class Thread implements Runnable {
 ......
//与此线程有关的ThreadLocal值。由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;

//与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
 ......
}

从上面Thread类 源代码可以看出Thread 类中有一个 threadLocals 和 一个 inheritableThreadLocals 变量,它们都是 ThreadLocalMap 类型的变量,我们可以把 ThreadLocalMap 理解为ThreadLocal 类实现的定制化的 HashMap。默认情况下这两个变量都是 null,只有当前线程调用 ThreadLocal 类的 setget方法时才创建它们,实际上调用这两个方法的时候,我们调用的是ThreadLocalMap类对应的 get()set()方法。

ThreadLocal类的set()方法

public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

通过上面这些内容,我们足以通过猜测得出结论:最终的变量是放在了当前线程的 ThreadLocalMap 中,并不是存在 ThreadLocal 上,ThreadLocal 可以理解为只是ThreadLocalMap的封装,传递了变量值。ThrealLocal 类中可以通过Thread.currentThread()获取到当前线程对象后,直接通过getMap(Thread t)可以访问到该线程的ThreadLocalMap对象。

每个Thread中都具备一个ThreadLocalMap,而ThreadLocalMap可以存储以ThreadLocal为 key ,Object 对象为 value 的键值对。

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
 ......
}

比如我们在同一个线程中声明了两个 ThreadLocal 对象的话,会使用 Thread内部都是使用仅有那个ThreadLocalMap 存放数据的,ThreadLocalMap的 key 就是 ThreadLocal对象,value 就是 ThreadLocal 对象调用set方法设置的值。

ThreadLocalMapThreadLocal的静态内部类。

 

3.4. ThreadLocal 内存泄露问题

ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,而 value 是强引用。所以,如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。这样一来,ThreadLocalMap 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。ThreadLocalMap 实现中已经考虑了这种情况,在调用 set()get()remove() 方法的时候,会清理掉 key 为 null 的记录。使用完 ThreadLocal方法后 最好手动调用remove()方法

      static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

弱引用介绍:

如果一个对象只具有弱引用,那就类似于可有可无的生活用品。弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它 所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程, 因此不一定会很快发现那些只具有弱引用的对象。

弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java 虚拟机就会把这个弱引用加入到与之关联的引用队列中。

4. 线程池

4.1. 为什么要用线程池?

池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

4.2. 实现 Runnable 接口和 Callable 接口的区别

Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。Runnable 接口不会返回结果或抛出检查异常,但是**Callable 接口**可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。

工具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

Runnable.java

@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值也无法抛出异常
    */
    public abstract void run();
}

Callable.java

@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

4.3. 执行 execute()方法和 submit()方法的区别是什么呢?

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

我们以**AbstractExecutorService**接口中的一个 submit 方法为例子来看看源代码:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

上面方法调用的 newTaskFor 方法返回了一个 FutureTask 对象。

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

我们再来看看execute()方法:

public void execute(Runnable command) {
      ...
    }


4.4. 如何创建线程池

《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors 返回线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE ,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

方式一:通过构造方法实现ThreadPoolExecutor构造方法方式二:通过 Executor 框架的工具类 Executors 来实现 我们可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool : 该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

对应 Executors 工具类中的方法如图所示: Executor框架的工具类

4.5 ThreadPoolExecutor 类分析

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。

/**
     * 用给定的初始参数创建一个新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

下面这些对创建 非常重要,在后面使用线程池的过程中你一定会用到!所以,务必拿着小本本记清楚。

4.5.1 ThreadPoolExecutor构造函数重要参数分析

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数:

  1. keepAliveTime:当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  2. unit : keepAliveTime 参数的时间单位。
  3. threadFactory :executor 创建新线程的时候会用到。
  4. handler :饱和策略。关于饱和策略下面单独介绍一下。

4.5.2 ThreadPoolExecutor 饱和策略

ThreadPoolExecutor 饱和策略定义:

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy 此策略将丢弃最早的未处理的任务请求。

举个例子: Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy。在默认情况下,ThreadPoolExecutor 将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor 的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了)

4.6 一个简单的线程池 Demo

为了让大家更清楚上面的面试题中的一些概念,我写了一个简单的线程池 Demo。

首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们上面也说了两者的区别。)

MyRunnable.java

import java.util.Date;

/**
 * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
 * @author shuang.kou
 */
public class MyRunnable implements Runnable {

    private String command;

    public MyRunnable(String s) {
        this.command = s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.command;
    }
}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

ThreadPoolExecutorDemo.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

可以看到我们上面的代码指定了:

  1. corePoolSize: 核心线程数为 5。
  2. maximumPoolSize :最大线程数 10
  3. keepAliveTime : 等待时间为 1L。
  4. unit: 等待时间的单位为 TimeUnit.SECONDS。
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  6. handler:饱和策略为 CallerRunsPolicy

Output:

pool-1-thread-2 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-5 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-4 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-1 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-3 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-5 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-3 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-4 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-1 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-1 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-4 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-3 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-5 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-3 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-4 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-5 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-1 End. Time = Tue Nov 12 20:59:54 CST 2019

4.7 线程池原理分析

承接 4.6 节,我们通过代码输出结果可以看出:线程池每次会同时执行 5 个任务,这 5 个任务执行完之后,剩余的 5 个任务才会被执行。 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)

现在,我们就分析上面的输出内容来简单分析一下线程池原理。

**为了搞懂线程池的原理,我们需要首先分析一下 execute方法。**在 4.6 节中的 Demo 中我们使用 executor.execute(worker)来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:

   // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }

    private final BlockingQueue<Runnable> workQueue;

    public void execute(Runnable command) {
        // 如果任务为null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        // ctl 中保存的线程池当前的一些状态信息
        int c = ctl.get();

        //  下面会涉及到 3 步 操作
        // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
        // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
        // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果当前线程池为空就新创建一个线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }

通过下图可以更好的对上面这 3 步做一个展示,下图是我为了省事直接从网上找到,原地址不明。

图解线程池实现原理

现在,让我们在回到 4.6 节我们写的 Demo, 现在应该是不是很容易就可以搞懂它的原理了呢?

没搞懂的话,也没关系,可以看看我的分析:

我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务执行完成后,才会执行剩下的 5 个任务。

0 16

CAS及ABA的定义和实现

CAS思想(算法):对于内存中的某一个值data,提供一个旧值A一个新值B。如果提供的旧值V和A相等就把B写入V,这个过程是原子性的。

CAS执行结果要么成功要么失败,对于失败的情形下一般采用不断重试,或者放弃。

举例说明:

public class CAS{
    public static void main(String[] args) {
        /*乐观锁的实现CAS;*/
        String data = "123";//共享数据

        /*更新数据的线程会进行如下操作*/
        boolean flag=true;
        while(flag){
            oldValue = date;//保存原始数据
            newValue = doSomething(oldValue);
            //下面的部分为CAS操作,尝试更新出data值
            if(date == oldValue){//比较在更新之前原值有无更改,如没有
                //则更新:
                data = newValue;
                falg = false;//跳出循环
            }
            else{
                //什么也不做,循环重试
            }
        }
    }
}
/*
   很明显,这样的代码根本不是原子性的,
   因为真正的CAS利用了CPU指令,
   这里只是为了展示执行流程,本意是一样的。
*/

ABA问题

CAS实现的过程是先取出内存中某时刻的数据在下一时刻比较并替换,那么在这个时间差会导致数据的变化,此时就有可能出现ABA问题;

什么是ABA问题?

如果另一个线程修改V值,假设原来是A,先修改成B,再修改会A。当前线程的CAS操作无法分辨当前V值是否发生过变化。

举例说明:

在你非常渴的情况下发现一个盛满水的被子,你一饮而尽,之后再给杯子里面重新倒满水,然后你离开,当杯子的真正主人(当前执行CAS操作的线程)看到杯子还是盛满水的,他当然不知道是否被人喝完重新倒满。解决这个问题的方案的一个策略是 每一次倒水假设有一个自动记录仪记录下(Version版本号),这样主人回来就可以分辨在他离开后是否发生过重新倒满的情况。这也是解决ABA问题目前采用的策略。

解决办法:

1.加版本号

2.用AtomicStampedReference/AtomicMarkableReference解决ABA问题

 

补充:

洪山区最新新开了一家自助餐店,为迎接5-1劳动节,老板决定为每位消费卡余额低于20的用户卡里赠送20元,该活动每位顾客只可享受一次;

很简单就用cas技术,先去用户卡里的余额,然后包装成 AtomicInteger,写一个判断,开启10个线程,然后判断小于20的,一律加20,然后就很开心的交差了。可是过了一段时间,发现账面亏损的厉害,老板起先的预支是2000块,因为店里的会员总共也就100多个,就算每人都符合条件,最多也就2000啊,怎么预支了这么多。小王一下就懵逼了,赶紧debug,tail -f一下日志,这不看不知道,一看吓一跳,有个客户被充值了10次!

解释:

用户要给储值卡充值20元,用户卡内余额15元,A线程首先获取余额是15,然后准备加上20,A线程因为某种原因block超时,系统超时重试提交,B线程成功将余额加上20并且成功提交,此时余额为35。但是紧接着用户又消费了20,所以余额还是15,终于A线程获取到了时间片,它比对之后发现余额还是15,所以A线程就执行了。ABA的问题核心在于一个线程在提交的时候,如果只是根据要修改的值和之前是否一样,这样是无法证明这个值没有被其他线程改过因为在这段时间它的值可能被改为其他值,然后又改原来的,实际上如果避免重复提交就能避免ABA问题,而版本号控制可以避免重复提交

0 13

一个锁可以同时是悲观锁可重入锁公平锁可中断锁等等,就像一个人可以是男人、医生、健身爱好者、并不矛盾;

 

synchronized与Lock

Java加锁的方式有两种:

1.使用synchronized关键字

2.用Lock接口的实现类

synchronized关键字是自动挡,可以满足一切日常驾驶需求;但是如果我们想玩飘逸或其他骚操作(刀片超车),就需要手动挡—各种Lock的实现类

所以如果我们只是想要简单加个锁,对性能也没什么特别的要求,用synchronized关键字就够了;自Java 5之后,才在java.util.concurrent.locks包下有了另外一种方式来实现锁,那就是Lock;也就是说,synchronized是Java语言内置的关键字,而Lock是一个接口,这个接口的实现类在代码层面实现了锁的功能;

这两种方式最大区别就是对于Synchronized来说,它是java语言的关键字,是原生语法层面的互斥,需要jvm实现。而ReentrantLock它是JDK 1.5之后提供的API层面的互斥锁,需要lock()和unlock()方法配合try/finally语句块来完成

便利性:很明显Synchronized的使用比较方便简洁,并且由编译器去保证锁的加锁和释放,而ReenTrantLock需要手工声明来加锁和释放锁,为了避免忘记手工释放锁造成死锁,所以最好在finally中声明释放锁。

锁的细粒度和灵活度:很明显ReenTrantLock优于Synchronized

ReentrantLockReadLockWriteLockLock接口最重要的三个实现类对应“可重入锁”“读锁”“写锁”,后面会讲其用途;

ReadWriteLock其实是一个工厂接口,而ReentranReadWriteLock是ReadWriterLock的实现类,它包含两个静态内部类,即ReadLock和WriteLock,这两个静态内部类又实现了Lock接口;

 

各种锁分类的概念、synchronized、各种Lock实现类之间的区别与联系;

 

一、悲观锁和乐观锁

锁的一种宏观分类方式是悲观锁和乐观锁。悲观锁与乐观锁并不是特指某个锁(Java中没有哪个Lock实现类就叫PessimisticLock或OptimisticLock),而是在并发情况下的两种不同策略

悲观锁(Pessimistic Lock),很悲观,往坏的方向想,即每次去拿数据的时候都认为别人会修改,因此在每次拿数据的时候都会上锁,这样别人想拿数据就会被挡住,直到悲观锁被释放;

乐观锁(Optimistic Lock),很乐观,往好的方向想,每次去拿数据的时候都认为别人不会修改,所以不会上锁!不会上锁!不会上锁!但是如果想要更新数据,会在更新前检查在读取至更新这段时间别人有没有修改过这个数据,如果修改过,则重新读取,再次尝试更新,循环上述步骤直到更新成功(也允许更新失败的线程放弃操作)

乐观锁阻塞事务,悲观锁回滚重试,它们各有优缺点,不能固执的认为一种一定好于另一种;乐观锁适用于写操作比较少的情况下即冲突真的很少发生的时候,这样可以省去锁的开销,加到了系统的整个吞吐量;但如果经常发生冲突,上层应用会不断的进行重试,这样反倒是降低了性能,所以这种情况下使用悲观锁就比较合适;

 

二、乐观锁的基础—CAS

说到乐观锁,就必须提到一个概念:CAS

什么是CAS?Compare-and-Swap,即比较并替换,也叫Compare-and-Set的,比较并设置。

1.比较:读取了一个值A,在将其更新为B之前,检查原值是否仍为A(未被其他线程改动);

2.设置:如果是,将A更新为B,结束。如果不是,则什么也不做;

上面的两步操作具有原子性那个,理解为瞬间完成,在CPU看来就是一步操作;

通过CAS实现一个乐观锁;

data = 123; // 共享数据

/* 更新数据的线程会进行如下操作 */
flag = true;
while (flag) {
    oldValue = data; // 保存原始数据
    newValue = doSomething(oldValue); 

    // 下面的部分为CAS操作,尝试更新data的值
    if (data == oldValue) { // 比较
        data = newValue; // 设置
        flag = false; // 结束
    } else {
	// 啥也不干,循环重试
    }
}
/* 
   很明显,这样的代码根本不是原子性的,
   因为真正的CAS利用了CPU指令,
   这里只是为了展示执行流程,本意是一样的。
*/

这是一个简单直观的乐观锁实现,它允许多个线程同时读取(因为根本没有加锁操作),但是只有一个线程可以成功更新数据,并导致其他要更新数据的线程回滚重试 CAS利用CPU指令,从硬件层面保证了操作的原子性,以达到类似于锁的效果。

Java中真正的CAS操作调用的native方法

因为整个过程中并没有“加锁”和“解锁”操作,因此乐观锁策略也被称为无锁编程。换句话说,乐观锁其实不是“锁”,它仅仅是一个循环重试CAS的算法而已!

三、自旋锁

有一种锁叫自旋锁。所谓自旋,说白了就是一个 while(true) 无限循环。

刚刚的乐观锁就有类似的无限循环操作,那么它是自旋锁吗?

感谢评论区养猫的虾的指正。

不是。尽管自旋与 while(true) 的操作是一样的,但还是应该将这两个术语分开。“自旋”这两个字,特指自旋锁的自旋

然而在JDK中并没有自旋锁(SpinLock)这个类,那什么才是自旋锁呢?读完下个小节就知道了。

四、synchronized锁升级:偏向锁 → 轻量级锁 → 重量级锁

前面提到,synchronized关键字就像是汽车的自动档,现在详细讲这个过程。一脚油门踩下去,synchronized会从无锁升级为偏向锁,再升级为轻量级锁,最后升级为重量级锁,就像自动换挡一样。那么自旋锁在哪里呢?这里的轻量级锁就是一种自旋锁

初次执行到synchronized代码块的时候,锁对象变成偏向锁(通过CAS修改对象头里的锁标志位),字面意思是“偏向于第一个获得它的线程”的锁。执行完同步代码块后,线程并不会主动释放偏向锁。当第二次到达同步代码块时,线程会判断此时持有锁的线程是否就是自己(持有锁的线程ID也在对象头里),如果是则正常往下执行。由于之前没有释放锁,这里也就不需要重新加锁。如果自始至终使用锁的线程只有一个,很明显偏向锁几乎没有额外开销,性能极高。

一旦有第二个线程加入锁竞争,偏向锁就升级为轻量级锁(自旋锁)。这里要明确一下什么是锁竞争如果多个线程轮流获取一个锁,但是每次获取锁的时候都很顺利没有发生阻塞,那么就不存在锁竞争只有当某线程尝试获取锁的时候,发现该锁已经被占用只能等待其释放,这才发生了锁竞争。

在轻量级锁状态下继续锁竞争,没有抢到锁的线程将自旋,即不停地循环判断锁是否能够被成功获取。获取锁的操作,其实就是通过CAS修改对象头里的锁标志位。比较当前锁标志位是否为“释放”,如果是则将其设置为“锁定”,比较并设置是原子性发生的。这就算抢到锁了,然后线程将当前锁的持有者信息修改为自己。

长时间的自旋操作是非常消耗资源的,一个线程持有锁,其他线程就只能在原地空耗CPU,执行不了任何有效的任务,这种现象叫做忙等(busy-waiting)如果多个线程用一个锁,但是没有发生锁竞争,或者发生了很轻微的锁竞争,那么synchronized就用轻量级锁,允许短时间的忙等现象。这是一种折衷的想法,短时间的忙等,换取线程在用户态和内核态之间切换的开销。

显然,此忙等是有限度的(有个计数器记录自旋次数,默认允许循环10次,可以通过虚拟机参数更改)。如果锁竞争情况严重,某个达到最大自旋次数的线程,会将轻量级锁升级为重量级锁(依然是CAS修改锁标志位,但不修改持有锁的线程ID)。当后续线程尝试获取锁时,发现被占用的锁是重量级锁,则直接将自己挂起(而不是忙等),等待将来被唤醒在JDK1.6之前,synchronized直接加重量级锁,很明显现在得到了很好的优化。

一个锁只能按照 偏向锁、轻量级锁、重量级锁的顺序逐渐升级(也有叫锁膨胀的),不允许降级。(补充:可以降级,但是条件比较苛刻)

感谢评论区酷帅俊靓美的问题:
偏向锁的一个特性是,持有锁的线程在执行完同步代码块时不会释放锁。那么当第二个线程执行到这个synchronized代码块时是否一定会发生锁竞争然后升级为轻量级锁呢?
线程A第一次执行完同步代码块后,当线程B尝试获取锁的时候,发现是偏向锁,会判断线程A是否仍然存活。如果线程A仍然存活,将线程A暂停,此时偏向锁升级为轻量级锁,之后线程A继续执行,线程B自旋。但是如果判断结果是线程A不存在了,则线程B持有此偏向锁,锁不升级。
还有人对此有疑惑,我之前确实没有描述清楚,但如果要展开讲,涉及到太多新概念,可以新开一篇了。更何况有些太底层的东西,我没读过源码,没有自信说自己一定是对的。其实在升级为轻量级锁之前,虚拟机会让线程A尽快在安全点挂起,然后在它的栈中“伪造”一些信息,让线程A在被唤醒之后,认为自己一直持有的是轻量级锁。如果线程A之前正在同步代码块中,那么线程B自旋等待即可。如果线程A之前不在同步代码块中,它会在被唤醒后检查到这一情况并立即释放锁,让线程B可以拿到。这部分内容我之前也没有深入研究过,如果有说的不对的,请多多指教啊!

五、可重入锁(递归锁)

可重入锁的字面意思是“可以重新进入的锁,即允许同一个线程多次获取同一把锁。比如一个递归函数里有加锁操作,递归过程中这个锁会阻塞自己吗?如果不会,那么这个锁就是可重入锁(因为这个原因可重入锁也叫做递归锁)

Java里只要以Reetrant开头命名的锁都是可重入锁,而且JDK提供的所有线程的Lock实现类,包括synochronized关键字锁都是可重入的如果你需要不可重入锁只能自己去实现。

六、公平锁、非公平锁

如果多个线程申请一把公平锁,那么当锁释放的时候,先申请的先得到,非常公平。显然如果是非公平锁,后申请的线程可能先获取到锁,是随机或者按照其他优先级排序的。

对ReentrantLock类而言,通过构造函数传参可以指定该锁是否是公平锁,默认是非公平锁。一般情况下,非公平锁的吞吐量比公平锁大,如果没有特殊要求,优先使用非公平锁。

ReentrantLock构造器可以指定为公平或非公平

对于synchronized而言,它也是一种非公平锁,但是并没有任何办法使其变成公平锁

七、可中断锁

可中断锁,字面意思是“可以响应中断的锁”。

这里的关键是理解什么是中断Java没有提供任何直接中断某线程的方法只提供了中断机制。何谓“中断机制”?线程A向线程B发出“请你停止运行”的请求(线程B也可以自己给自己发送此请求),但线程B并不会立刻停止运行,而是自行选择合适的时机以自己的方式响应中断,也可以直接忽略此中断。也就是说,Java的中断不能直接终止线程而是需要被中断的线程自己决定怎么处理。这好比是父母叮嘱在外的子女要注意身体,但子女是否注意身体,怎么注意身体则完全取决于自己。[2]

回到锁的话题上来,如果线程A持有锁,线程B等待获取该锁。由于线程A持有锁的时间过长,线程B不想继续等待了,我们可以让线程B中断自己或者在别的线程里中断它,这种就是可中断锁

在Java中,synchronized就是不可中断锁Lock的实现类都是可中断锁可以简单看下Lock接口。

/* Lock接口 */
public interface Lock {

    void lock(); // 拿不到锁就一直等,拿到马上返回。

    void lockInterruptibly() throws InterruptedException; // 拿不到锁就一直等,如果等待时收到中断请求,则需要处理InterruptedException。

    boolean tryLock(); // 无论拿不拿得到锁,都马上返回。拿到返回true,拿不到返回false。

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 同上,可以自定义等待的时间。

    void unlock();

    Condition newCondition();
}

八、读写锁、共享锁、互斥锁

读写锁其实是一对锁,一个读锁(共享锁)和一个写锁(互斥锁、排他锁)。

看下Java里的ReadWriteLock接口,它只规定了两个方法,一个返回读锁,一个返回写锁。

记得之前的乐观锁策略吗?所有线程随时都可以读,仅在写之前判断值有没有被更改。

读写锁其实做的事情是一样的,但是策略稍有不同。很多情况下,线程知道自己读取数据后,是否是为了更新它。那么何不在加锁的时候直接明确这一点呢?如果我读取值是为了更新它(SQL的for update就是这个意思),那么加锁的时候就直接加写锁,我持有写锁的时候别的线程无论读还是写都需要等待;如果我读取数据仅为了前端展示,那么加锁时就明确地加一个读锁,其他线程如果也要加读锁,不需要等待,可以直接获取(读锁计数器+1)。

虽然读写锁感觉与乐观锁有点像,但是读写锁是悲观锁策略因为读写锁并没有更新前判断值有没有被修改过,而是在加锁前决定应该用读锁还是写锁乐观锁特指无锁编程,如果仍有疑惑可以再回到第一、二小节,看一下什么是“乐观锁”。

JDK提供的唯一一个ReadWriteLock接口实现类是ReentrantReadWriteLock。看名字就知道,它不仅提供了读写锁,而是都是可重入锁。 除了两个接口方法以外,ReentrantReadWriteLock提供了一些便于外界监控其内部工作状态的方法,这里就不一一展开。

九、回到悲观锁和乐观锁

这篇文章经历过一次修改,我之前认为偏向锁和轻量级锁是乐观锁,重量级锁和Lock实现类为悲观锁,网上很多资料对这些概念的表述也很模糊,各执一词。

先抛出我的结论:

我们在Java里使用的各种锁几乎全都是悲观锁synchronized偏向锁、轻量级锁重量级锁全是悲观锁JDK提供的Lock实现类全是悲观锁其实只要有“锁对象”出现,那么就一定是悲观锁。因为乐观锁不是锁,而是一个在循环里尝试CAS的算法。

那JDK并发包里到底有没有乐观锁呢?

有。java.util.concurrent.atomic包里面的原子类都是利用乐观锁实现的。

原子类AtomicInteger的自增方法为乐观锁策略

为什么网上有些资料认为偏向锁、轻量级锁是乐观锁?理由是它们底层用到了CAS?或者是把“乐观/悲观”与“轻量/重量”搞混了?其实,线程在抢占这些锁的时候,确实是循环+CAS的操作,感觉好像是乐观锁。但问题的关键是,我们说一个锁是悲观锁还是乐观锁,总是应该站在应用层看它们是如何锁住应用数据的,而不是站在底层看抢占锁的过程。如果一个线程尝试获取锁时发现已经被占用它是否继续读取数据,等后续要更新时再决定要不要重试?对于偏向锁轻量级锁来说,显然答案是否定的。无论是挂起还是忙等,对应用数据的读取操作都被“挡住”了。从这个角度看,它们确实是悲观锁。

退一步讲,也没有必要在这些术语上狠钻牛角尖,最重要的是理解它们的运行机制。想写得尽量简单一些,却发现洋洋洒洒近万字,只讲了个皮毛。深知自己水平有限,不敢保证完全正确,只能说路漫漫其修远兮,望指正。

参考

  1. ^这里存在一个问题,就是一个值从A变为B,又从B变回了A。这种情况下,CAS可能会认为值没有发生过变化,但实际上是有变化的。对此,并发包下有AtomicStampedReference提供根据版本号判断的实现。ABA问题。
  2. ^Java中断机制: https://www.cnblogs.com/jiangzhaowei/p/7209949.html