# JUC并发

多线程可有效地提升程序的执行性能,但是使用传统的多线程实现机制进行同步(wait和notify),编写难度较高,要考虑线程死锁,公平,资源管理等诸多因素,为了简化多线程同步处理机制,从JDK1.5开始,Java提供了JUC并发编程开发包(java.util.concurrent)支持,它的核心是高性能并发访问与安全修改。

JUC的核心类如下:

类名 作用
Executor Runnable任务的执行者
ExecutorService 线程池管理
ScheduledExecutorService 线程延迟调度池
CompletionService ExecutorService的扩展,可以获得线程执行结果
Callable 线程执行者,可以获取线程执行后的结果
Future 获取Callable线程执行结果
Semaphore 同步计数信号量
ReentrantLock 互斥锁
BlockingQueue 阻塞队列
CountDownLatch 同步辅助类,实现一组线程类的锁定处理
CyclicBarrier 同步辅助类,允许一组线程相互等待,达到既定线程个数后可解锁

# TimeUnit

它是一个描述时间单元的枚举类,包括:天(DAYS),时(HOURS),分(MINUTES),秒(SECONDS),毫秒(MILLISECONDS),微妙(MICROSECONDS),纳秒(NANOSECONDS)。

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        long convertToSecond = TimeUnit.SECONDS.convert(3, TimeUnit.HOURS);
        System.out.println(convertToSecond);
        long currentTimeMillies = System.currentTimeMillis();
        long after20Days = currentTimeMillies +TimeUnit.MILLISECONDS.convert(20, TimeUnit.DAYS);
        System.out.println("20 days later:"+new SimpleDateFormat("yyyy-MM-dd").format(new Date(after20Days)));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

之前Thread类提供的休眠方法Thread.sleep()是通过毫秒来定义时间,TimeUnit类中也有提供sleep()休眠的方法:

import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        new Thread(()->{
            for(int i=0;i<10;i++) {
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+":"+i);
            }
        }).start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 原子操作

在多线程中经常出现多个线程对一个共享变量的并发修改,为了保证其正确性,操作的方法用synchronized关键字来操作,JDK1.5开始后,提供了java.util.concurrent.atomic来操作包,该包中提供一种用法简单,线程安全,性能高效的方式来更新变量。其中操作分为四类:

  • 基本类型:AtomicInteger,AtomicLong,AtomicBoolean。
  • 数组类型:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray。
  • 引用类型:AtomicReference,AtomicStampedReference,AtomicMarkableReference。
  • 对象属性修改类型:AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater。

原子操作类最大的特点是能线程安全更新,帮助用户以一种线程安全的简单方式来进行更新,这些原子类数据保存属性上使用了volatile关键字,可以防止由于数据缓存造成的数据更新不一致问题。

注意JUC中两大类:CAS和AQS,CAS是java.util.concurrent.atomic包的基础,AQS是同步锁的实现基础。

CAS(Compare And Swap)是一条CPU并发原语,它的功能是判断内存某个位置的值是否是预期值,如果是则更改为新的值,该过程属于原子性操作,CAS并发原语在Java中的体现就是sun.misc.Unsafe类中的各个方法,调用Unsafe类中的CAS方法,JVM会帮开发者实现CAS汇编指令,完全依赖于硬件功能。CAS是乐观锁,是一种冲突重试机制,在并发不是很激烈的情况下,操作性能要好于悲观锁机制(synchronized同步处理)。

# 基本原子操作

基本原子操作类有3个:AtomicInteger,AtomicLong,AtomicBoolean。在32为系统中,64位的long和double变量由于被JVM当成两个分离的32位进行操作,所以不具有原子性,使用AtomicLong能让long类型操作保持原子性。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class Main {
    static long csum = 0;
    public static void main(String[] args) throws InterruptedException {
        AtomicLong sum = new AtomicLong(0);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                while (true) {
                    sum.addAndGet(1);
                    csum++;
                }
            }).start();
        }
        TimeUnit.MICROSECONDS.sleep(2);
        System.out.println("线程安全sum:" + sum.get() + " 正常sum:" + csum);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 数组原子操作

数组原子操作也是3个,AtomicIntegerArray,AtomicLongArray和AtomicReferenceArray(对象数组)。

import java.util.concurrent.atomic.AtomicReferenceArray;

public class Main {
    static long csum = 0;

    public static void main(String[] args) throws InterruptedException {
        String strInfo[] = new String[] {
                "hello","world","welcome"
        };
        AtomicReferenceArray<String> strArr = new AtomicReferenceArray<String>(strInfo);
        System.out.println("change world to xie:"+strArr.compareAndSet(1, "world", "xie"));
        System.out.println("use temp object change hello to hi:"+strArr.compareAndSet(0, new String("hello"), "hi"));
        for(int i=0;i<strArr.length();i++) {
            System.out.println(i+":"+strArr.get(i));
            //System.out.println(i+":"+strInfo[i]);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

Java中的比较有两类,hashcode和equals,以及比较器。在CAS方法中比较字符串是否相等只是使用了==,因此使用匿名String对象无法比较并替换。

# 引用类型原子操作

引用类型原子操作有三个,AtomicReference,AtomicStampedReference,AtomicMarkableReference。

import java.util.concurrent.atomic.AtomicMarkableReference;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;

class Info{
    private String name;
    public Info(String n) {
        name=n;
    }
    @Override
    public String toString() {
        // TODO Auto-generated method stub
        return this.name;
    }
    
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Info info1 = new Info("Apple");
        Info info2 = new Info("Banana");
        AtomicReference<Info> ref = new AtomicReference<Info>(info1);
        ref.compareAndSet(info1, info2);
        System.out.println(ref);

        //使用版本号
        AtomicStampedReference<Info> ref2 = new AtomicStampedReference<Info>(info1, 10);
        ref2.compareAndSet(info1, info2, 10, 100);
        System.out.println(ref2.getStamp());
        
        //使用标记
        AtomicMarkableReference<Info> ref3 = new AtomicMarkableReference<Info>(info1, true);
        ref3.compareAndSet(info1, info2, true, false);
        System.out.println(ref3.getReference()+" with flag:"+ref3.attemptMark(info2, false));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

JUC提供AtomicMarkableReference和AtomicStampedReference是为了解决多线程访问下的数据ABA不同步问题,比如AB两个人想要修改文件服务器上的同一个文件,A打开了之后,有其他事情就放着做其他事情,B打开之后一直在修改,并保存了修改,A回来之后保存了下文件,B所做的修改全都丢失了。

因为ABA问题,可能造成CAS的数据更新错误,CAS的是否相等判断是使用==,因此可能会产生错误,这时候使用版本或者标记来判断是否要修改。

# 对象属性修改原子操作

JUC有三个属性原子操作类:AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater,通过这三个类可以线程安全地更新属性。

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

class Info{
    private volatile String name;
    public Info(String n) {
        name=n;
    }
    @Override
    public String toString() {
        // TODO Auto-generated method stub
        return this.name;
    }
    public void setName(String temp) {
        AtomicReferenceFieldUpdater atName = AtomicReferenceFieldUpdater.newUpdater(Info.class, String.class, "name");
        atName.compareAndSet(this, this.name, temp);
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Info info1 = new Info("li");
        System.out.println(info1);
        info1.setName("wang");
        System.out.println(info1);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 并发计算

使用原子操作类能保证多线程并发访问下的数据操作安全性,为了进一步加强多线程下的计算操作,从JDK 1.8之后开始提供累加器(DoubleAccumulator,LongAccumulator)和加法器(DoubleAdder,LongAdder)的支持。

import java.util.concurrent.atomic.DoubleAccumulator;
import java.util.concurrent.atomic.DoubleAdder;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        DoubleAccumulator da = new DoubleAccumulator((x,y)->x*y, 10);
        System.out.println(da.doubleValue());
        da.accumulate(12);
        System.out.println(da.get());
        
        DoubleAdder doubleAdder = new DoubleAdder();
        doubleAdder.add(10);
        doubleAdder.add(30);
        doubleAdder.add(50);
        System.out.println(doubleAdder.sum());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# ThreadFactory

多线程的执行需要实现Runnable或Callable接口,为了进一步规划线程类的对象产生,JUC提供了一个ThreadFactory接口,可利用此接口来获取Thread类的实例化对象。

threadFactory

import java.util.concurrent.ThreadFactory;

class CustomThreadFactory implements ThreadFactory{
    private static final ThreadFactory INSTANCE = new CustomThreadFactory();
    private static int count=0;
    private static final String ThreadFlag="MyThread";

    private CustomThreadFactory() {
        // TODO Auto-generated constructor stub
        System.out.println("Init My Custom ThreadFactory!");
    }
    
    public static ThreadFactory GetInstance() {
        return INSTANCE;
    }

    @Override
    public Thread newThread(Runnable r) {
        // TODO Auto-generated method stub
        return new Thread(r,ThreadFlag+count++);
    }
    
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread myThread = CustomThreadFactory.GetInstance().newThread(()->{
            System.out.println(Thread.currentThread().getName()+" 执行了!");
        });
        myThread.start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 线程锁

Java传统的线程锁机制需要使用synchronized同步和Object中的wait()方法,notify()方法进行控制,但是它们并不易于使用,在JUC中提供了一个新的锁框架,java.util.concurrent.locks,它有两个核心接口:

  • Lock接口:支持各种不同语义("公平机制锁","非公平机制锁","可重入锁")的锁规则。
  • ReadWriteLock接口:针对线程的读或写提供不同的锁处理机制,在数据读取时采用共享锁,数据修改时采用独占锁,可以保证数据访问的性能。

JUC提供了大量不同类型的锁处理类:ReentrantLock,StampedLock,LockSupport,Semaphore,CountDownLatch,CyclicBarrier,Exchanger,CompletableFuture。

AbstractQueuedSynchronizer,简称AQS,实现同步器的基本组件,在JUC中它有3个支持类:AbstractOwnableSynchronizer,AbstractQueuedSynchronizer,AbstractQueuedLongSynchronizer。这3个类主要功能是实现锁以及阻塞线程执行的功能,JUC中有很多同步类都依赖于AQS支持,其中AbstractQueuedLongSynchronizer提供64位操作系统支持。

threadLock

# ReentrantLock

它提供了一种互斥锁(独占锁)机制,在同一个时间点内只允许有一个线程持有该锁,其他线程需要等待,然后在该线程释放时,竞争获取锁。它也是一个可重用锁,可被单线程重复获取。

ReentrantLock分为FairSync(公平锁)和NonfairSync(非公平锁),它们的区别体现在获取锁的机制上是否公平,锁是为了保护竞争资源,防止多个线程同时操作同一个资源而出错,在同一时间内只有一个线程能获取ReentrantLock,所有未获得锁的需要等待,ReentranLock通过一个FIFO队列来管理所有等待线程。

reentrantLock

import java.util.concurrent.locks.ReentrantLock;

class MyTicket{
    private int ticketCount=10;
    private ReentrantLock reentrantLock = new ReentrantLock();
    public void saleTicket() {
        try {
            this.reentrantLock.lock();
            if(this.ticketCount>0) {
                Thread.sleep(200);
                System.out.println(Thread.currentThread().getName()+" 卖出一张票,剩余:"+this.ticketCount--);
            }else {
                System.out.println("票卖完了!");
            }
        } catch (Exception e) {
            // TODO: handle exception
        } finally {
            this.reentrantLock.unlock();
        }
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        MyTicket ticket = new MyTicket();
        for(int i=0;i<10;i++) {
            new Thread(()->{
                ticket.saleTicket();
            },"售票窗口"+i).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

从上面看出,获取锁使用lock()方法,释放锁使用unlock()方法。锁的的获取有两种实现机制,公平机制和非公平机制,这里面会使用一个CLH队列,它是一个非阻塞的FIFO队列,往里面插入或移除一个节点时,在并发条件下不会产生阻塞,它是通过自旋锁和CAS保证节点插入和删除的原子性。公平机制和非公平机制区分在:

  • 公平锁:只有线程是CLH等待队列的表头时才能获取锁。
  • 非公平锁:前锁处于空闲状态,不管CLH中等待队列中的顺序,直接获取锁。

# ReentrantReadWriteLock

独占锁的最大特点是只允许一个线程进行操作,进行数据更新的时候可以保证完整性,但是在数据读取的时候会造成严重的性能问题。为了解决高并发下的快速访问与完全修改,JUC提供了ReentrantReadWriteLock,在读取的时候加读锁,写入的时候加写锁,两种锁是互斥的,由JVM控制。读锁属于共享锁,几个线程能共享,写锁只允许一个线程进行操作。

reentrantReadWriteLock

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class MyTicket{
    private int ticketCount=10;
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); 
    public void saleTicket() {
        try {
            this.readWriteLock.writeLock().lock();;
            if(this.ticketCount>0) {
                Thread.sleep(200);
                System.out.println(Thread.currentThread().getName()+" 卖出一张票,剩余:"+ --this.ticketCount);
            }else {
                System.out.println("票卖完了!");
            }
        } catch (Exception e) {
            // TODO: handle exception
        } finally {
            this.readWriteLock.writeLock().unlock();
        }
    }
    public void showTicket() {
        this.readWriteLock.readLock().lock();
        try {
            Thread.sleep(50);
            System.out.println(Thread.currentThread().getName()+" 有票:"+this.ticketCount);
        } catch (Exception e) {
            // TODO: handle exception
        } finally {
            this.readWriteLock.readLock().unlock();
        }
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        MyTicket ticket = new MyTicket();
        for(int i=0;i<10;i++) {
            new Thread(()->{
                ticket.saleTicket();
            },"售票窗口"+i).start();
            new Thread(()->{
                ticket.showTicket();
            },"读取票线程"+i).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 乐观锁和悲观锁

乐观锁和悲观锁的区别:

  • 悲观锁:在每次拿数据时都认为别人会修改,它总是假设最坏的情况,每次拿数据时都会上锁,别人要拿数据得阻塞直到它拿到锁。传统的数据库里面都是这种锁机制,行锁,读锁,Java中的synchronized关键字也是这个原理。
  • 乐观锁:每次拿数据时认为别人不会修改该数据,不会上锁,在更新的时候用一个版本号来判断是不是自己读取时的数据,如果别人没有动过,那么就更新,乐观锁适用于多读的并发类型,可提高吞吐量。java中java.util.concurrent.atomic包下的原子变量类就是CAS的实现。

# StampedLock

读/写锁能保证在并发访问下的数据写入安全与读取性能,虽然读锁之间是不互斥的,但是读锁和写锁互斥,这在读线程非常多的情况下可能造成写线程长时间阻塞,从而导致写操作长时间饥饿,JUC针对读/写锁提出了改件方案,提供了无障碍锁(StampedLock),它的特点是有多个读线程之间不会相互影响,但是依然可保证多个写线程的独占操作。

这是JDK8之后新增的锁,跟读写不同,并不由AQS实现,它提供了三种模式来控制读写,在内部实现了自己的同步等待队列。

  • 写锁:使用writeLock方法获取,获取不到锁时阻塞,获取到锁后会得到一个stamp,后面通过这个stamp在unlockWrite方法中解锁。它是独占锁,是悲观锁。
  • 读锁:使用readLock方法获取,超出可用资源时会阻塞。它是悲观锁。
  • 乐观读锁:通过tryOptimisticRead方法获取,只有在当前模式不是写锁时才能获取乐观读锁,可通过validate和获取后的stamp来判断写锁是否被获取,它常被用来在较短只读代码段中,用来减少争用并提高吞吐量,在乐观读模式中字段读取可能会不一致,需要反复使用validate()来检查一致性,并存储一个副本反复对比。

# 模式转换

使用tryConvertToWriteLock,tryConvertToReadLock和tryConvertToOptimisticRead可将当前模式转换为写锁,读锁,乐观锁。

当前模式 转换为写锁 转换为读锁 转换为乐观锁
写锁 直接返回当前stamp 释放写锁,获取读锁,返回stamp 直接释放写锁,返回stamp
读锁 释放读锁,获取写锁,返回stamp 返回当前stamp 直接释放读锁,返回stamp
乐观锁 通过CAS立即获取写锁,成功返回stamp,失败返回0 通过CAS立即获取读锁,成功返回stamp,失败返回0 若乐观锁有效,返回stamp

读锁和写锁转换,读锁和写锁转换为乐观锁,是直接转换的;乐观锁转换为读锁,写锁是需要尝试的。

实例:

import java.util.concurrent.locks.StampedLock;

class MyTicket {
    private int ticketCount = 1000;
    private StampedLock stampedLock = new StampedLock();

    public void saleTicketWriteBlock() { // 排他锁
        long stamp = stampedLock.writeLock();
        try {
            ticketCount -= 1;
        } finally {
            stampedLock.unlockWrite(stamp);
        }

    }

    public void saleTicket() { // write
        long stamp = stampedLock.readLock();
        long writeStamp = 0;
        boolean canWrite = true;
        try {
            while (canWrite) {
                writeStamp = this.stampedLock.tryConvertToWriteLock(stamp);
                System.out.println("--------stampedLock.writeLock:" + writeStamp);
                if (writeStamp != 0L) {
                    stamp=writeStamp;
                    if (this.ticketCount > 0) {
                        Thread.sleep(20);
                        System.out.println(Thread.currentThread().getName() + " 卖出一张票,剩余:" + --this.ticketCount);
                    } else {
                        System.out.println(Thread.currentThread().getName() +"票卖完了!");
                    }
                    canWrite=false;
                    break;
                } else {
                    //System.out.println(Thread.currentThread().getName() + " 读锁: "+stampedLock.isReadLocked()+" 写锁:"+stampedLock.isWriteLocked());
                    stampedLock.unlockRead(stamp);
                    stamp = stampedLock.writeLock();
                }
            }
        } catch (Exception e) {
            // TODO: handle exception
        } finally {
            //System.out.println(Thread.currentThread().getName()+":");
            this.stampedLock.unlock(stamp);
        }
        
    }

    public void showTicket() { // 升级,只读方法

        long stamp = this.stampedLock.tryOptimisticRead();
        System.out.println("stampedLock.tryOptimisticRead:" + stamp);
        int localTicketNum = this.ticketCount;
        if (!stampedLock.validate(stamp)) {
            stamp = stampedLock.readLock();
            localTicketNum = this.ticketCount;
            stampedLock.unlockRead(stamp);
        } else {
            System.out.println("使用了乐观锁!");
        }
        
        System.out.println(Thread.currentThread().getName() + " 的票:" + localTicketNum);
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

public class Main {
    public static void main(String[] args) {
        MyTicket ticket = new MyTicket();
        for (int i = 0; i < 200; i++) {
            new Thread(() -> {
                ticket.saleTicket();
            }, "售票窗口 " + i).start();
        }
        
        for(int i=0;i<500;i++) {
            new Thread(() -> {
                ticket.showTicket();
            }, "读取票线程 " + i).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

# Condition

JUC中允许用户自己创建锁对象,可通过实现Condition接口,Condition提供了与Object类中类似的线程控制方法,自己来定义使用的锁。

condition

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    public static String name = "li";
    public static void main(String[] args) throws InterruptedException {
        Lock reentrantlock = new ReentrantLock();
        Condition condition = reentrantlock.newCondition();
        reentrantlock.lock();
        
        new Thread(()->{
            reentrantlock.lock();
            name="xie";
            condition.signal();
            reentrantlock.unlock();
        },"数据处理").start();
        
        condition.await();
        System.out.println("name="+name);
        reentrantlock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

上面通过condition的await()和signal()方法实现了线程的锁定和唤醒。注意,使用这两个方法需要使用reentrantlock.lock提前处理,跟Object的wait,notify必须在synchronized方法中使用一样,condition还能更加精细地控制多线程的休眠和唤醒:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class DataPool {
    private static final int MAX_SIZE = 10;
    private Lock lock = new ReentrantLock();

    private final Condition writeCondition = lock.newCondition();
    private final Condition readCondition = lock.newCondition();

    private final Object[] dataPool = new Object[MAX_SIZE];
    private int count = 0;

    public void PutEle(Object obj) {
        this.lock.lock();
        try {
            if (this.count == MAX_SIZE) {
                this.writeCondition.await();
            }
            this.dataPool[this.count++] = obj;
            this.readCondition.signal();
            System.out.println(Thread.currentThread().getName() + " 线程 写入数据:" + obj);
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println(e);
        } finally {
            this.lock.unlock();
        }
    }

    public Object getEle() {
        this.lock.lock();
        Object takeData = null;
        try {
            if (this.count == 0) {
                this.readCondition.await();
            }
            takeData = this.dataPool[--this.count];
            this.writeCondition.signal();
        } catch (Exception e) {
            // TODO: handle exception
        } finally {
            this.lock.unlock();
        }
        return takeData;
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        DataPool buffer = new DataPool();
        for(int i=0;i<10;i++) {
            int tempData=i;
            new Thread(()->{
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                buffer.PutEle(tempData);
            },"写线程"+i).start();
            new Thread(()->{
                try {
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName()+" 消费数据:"+buffer.getEle());
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            },"读线程"+i).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75

# LockSupport

Thread类从JDK 1.2版本开始为了防止死锁,废除了Thread类中的一些线程控制方法,比如suspend(),resume()等。但是有开发者认为,使用被废除的方法会更直观,在JUC中提供了这几个方法的替代类:

方法 描述
static void park() 阻塞线程
static void park(Object blocker) 阻塞指定线程对象
static void parkNanos(long nanos) 阻塞线程并设置线程阻塞时间
static void parkNanos(Object blocker,long nanos) 阻塞指定线程对象并设置阻塞时间
static void unpark(Thread thread) 线程解锁
import java.util.concurrent.locks.LockSupport;

public class Main {
    public static String name = "li";

    public static void main(String[] args) {
        Thread mainThread = Thread.currentThread();
        new Thread(() -> {
            System.out.println("线程:" + Thread.currentThread().getName() + " 在数据处理!");
            name = "xie";
            LockSupport.unpark(mainThread);
        }, "子线程1").start();
        LockSupport.park(mainThread);
        System.out.println("name=" + name);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# Semaphore

服务器提供的资源不是无限地,当并发访问线程量较大时需要针对所有可用资源进行线程调度:

方法名 描述
Semaphore(int permits) 设置调度资源总数,采用非公平机制
Semaphore(int permits,boolean fair) 设置调度资源总数与公平机制
void acquire() 获取操作许可
int availablePermits() 判断当前是否有空闲资源
void release(int permits) 释放资源
import java.util.concurrent.Semaphore;

public class Main {
    public static void main(String[] args) {
        Semaphore sem = new Semaphore(3);
        for(int i=0;i<10;i++) {
            new Thread(()->{
                try {
                    sem.acquire();
                    if(sem.availablePermits() >=0) {
                        System.out.println(Thread.currentThread().getName()+" 抢占成功!");
                    } else {
                        System.out.println(Thread.currentThread().getName()+" 抢占失败!等待并再次抢占!");
                    }
                    System.out.println(Thread.currentThread().getName()+" 开始业务!");
                    Thread.sleep(200);
                    System.out.println(Thread.currentThread().getName()+"--------正在处理--------!");
                    System.out.println(Thread.currentThread().getName()+" 结束业务!");
                    sem.release();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }).start();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# CountDownLatch

CountDownLatch可以保证一组子线程全部执行完毕后再执行主线程的操作。它是通过一个线程个数计数器实现的同步处理操作,初始时CountDownLatch设置一个线程执行总数,每当一个子线程执行完毕后执行一个减1的操作,当所有子线程执行完毕后,恢复主线程。

方法 描述
CountDownLatch(int count) 定义等待子线程总数
void await() 主线程阻塞,等待子线程执行
void countDown() 子线程执行完后减少等待数量
long getCount() 获取当前等待数量
import java.util.concurrent.CountDownLatch;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        for(int i=0;i<3;i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" 做完!");
                latch.countDown();
            },"线程"+i).start();
        }
        latch.await();
        System.out.print("准备干活");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# CyclicBarrier

它能保证多个线程达到某个公共屏障点(Common Barrier Point)时候才执行,如果没有达到屏障点那么线程将等待。比如坐地铁安检一波人检查完后才会有下一波。它就好比栅栏一样,保证若干个线程的并行执行,同时可以利用方法更新屏障点的状态以便更加方便的控制。

方法名 描述
CyclicBarrier(int parties) 设置屏障点数量
CyclicBarrier(int parties,Runnable barrierAction) 设置屏障点数量,达到屏障点后要执行的线程。
int await() 等待线程数量达到屏障点
int await(long timeout, TimeUnit unit) 等待线程数量达到屏障点,并设置等待超时时间
int getNumberWaiting() 获取等待子线程数量
void reset() 重置屏障点计数
boolean isBroken() 查询是否为中断状态
int getParties() 获取屏障点数量
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

class MyCyclicBarrier implements Runnable {
    public static int ticket = 6;
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        System.out.println("-----------3个子线程处理完毕-------------------");
    });

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " 等待执行!");
            try {
                cyclicBarrier.await(5, TimeUnit.SECONDS);
            } catch (TimeoutException e) {
                System.out.println(Thread.currentThread().getName() + " 发生超时!");
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 卖了一次票:" + this.ticket--);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            System.out.println(Thread.currentThread().getName() + " 发生Broken异常, 是否Broken:" + cyclicBarrier.isBroken());
            e.printStackTrace();
            //System.exit(-1);
        }
    }
}

public class Main {
    public static void main(String[] args) {
        MyCyclicBarrier mc1 = new MyCyclicBarrier();
        for (int i = 0; i < 4; i++) {
            new Thread(mc1, "业务线程 " + i + " :").start();
        }
        try {
            Thread.sleep(3000);
            System.out.println("正在等待的子线程数:" + mc1.cyclicBarrier.getNumberWaiting());
            mc1.cyclicBarrier.reset();
            System.out.println("正在等待的子线程数:" + mc1.cyclicBarrier.getNumberWaiting());
            Thread.sleep(3000);
        } catch (Exception e) {
            System.out.println("发生异常!");
            System.out.println(e);
        }
        for (int i = 0; i < 2; i++) {
            new Thread(mc1, "业务 " + i + " :").start();
        }
        System.out.println("程序结束!");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

上面这段程序运行结果为:

业务线程 3 : 等待执行!
业务线程 1 : 等待执行!
业务线程 2 : 等待执行!
业务线程 0 : 等待执行!
-----------3个子线程处理完毕-------------------
业务线程 1 : 卖了一次票:6
业务线程 3 : 卖了一次票:5
业务线程 0 : 卖了一次票:4
正在等待的子线程数:1
正在等待的子线程数:0
业务线程 2 : 发生Broken异常, 是否Broken:false
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    at MyCyclicBarrier.run(Main.java:17)
    at java.lang.Thread.run(Unknown Source)
程序结束!
业务 0 : 等待执行!
业务 1 : 等待执行!
业务 0 : 发生超时!
业务 1 : 发生Broken异常, 是否Broken:true
java.util.concurrent.TimeoutException
业务 0 : 卖了一次票:3
    at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    at MyCyclicBarrier.run(Main.java:17)
    at java.lang.Thread.run(Unknown Source)
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    at MyCyclicBarrier.run(Main.java:17)
    at java.lang.Thread.run(Unknown Source)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

注意:

  • 下面两种情况会抛出BrokenBarrierException
    • 如果barrier在有任意线程等待时被reset()
    • 在调用await()或有线程等待的时候barrier是broken状态。
  • 如果有任何线程在等待时被中断或者自己超时,然后其他等待线程会抛出BrokenBarrierException异常,然后barrier处于broken状态。
  • 抛出BrokenBarrierException时,程序会中断。抛出超时异常时,程序还是会继续执行。

# CyclicBarrier和CountDownLatch区别

  • CountDownLatch采用单次计数,计数操作只执行一次,如果在执行中线程出现了错误,计数无法重新开始。
  • CyclicBarrier采用循环计数,如果子线程中出现了错误,可使用reset()进行重置,继续使用。

# Exchanger

我们上面生产者和消费者模型中有一个公共区域进行数据的保存和获取,在JUC中专门提供了一个交换区域的程序类:java.util.concurrent.Exchanger类。

import java.util.concurrent.Exchanger;

public class Main {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<String>();
        new Thread(() -> {
            try {
                Thread.sleep(200);
                for (int i = 0; i < 20; i++) {
                    exchanger.exchange("hello:" + i);
                    // System.out.println(Thread.currentThread().getName()+" 生产数据");
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }, "producer ").start();
        new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    Thread.sleep(50);
                    System.out.println(exchanger.exchange(null));
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }, "consumer :" ).start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

exchanger是一个用于线程间数据交换的类,它提供一个同步点,在同步点的两个线程可交换彼此的数据,两个线程通过exchange方法交换数据,如果第一个线程先执行exchange方法,会等待第二个线程执行exchange,当两个线程到达同步点就可以交换数据,使用exchange的重点是成对的线程使用该方法。