# 并发集合
传统的Java集合属于"非线程安全的",虽然Java追加了Collections工具类来实现集合的同步处理,但是并发效率不高,为了更好地支持高并发任务处理,JUC中提供了支持高并发的处理类,为了保证集合操作的一致性,这些高并发集合类依然实现了集合标准接口,它们继承关系如下:
Java 7.0的并发集合类图:
传统集合进行并发访问会产生java.util.ConcurrentModificationException,如下程序:
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) {
List<String> l1 = new ArrayList<String>();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
l1.add(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + ":" + l1);
}, "线程" + i).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# CopyOnWriteArrayList
它是基于数组实现的集合并发类,每次使用它进行数据的增删改查都会创建一个新的数组,并将更新后的数据复制到新建的数组中,由于每次新建数组,它的并发性能不高,但是在进行数据遍历查找时性能会比较高,在使用迭代器时不支持删除操作。
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class Main {
public static void main(String[] args) {
List<String> l1 = new CopyOnWriteArrayList<String>();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
l1.add(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + ":" + l1);
}, "线程" + i).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# CopyOnWriteArraySet
它提供了一种无序的线程安全集合结构,同HashSet类似,但是HashSet是基于散列方式存放,而CopyOnWriteArraySet是基于数组,注意它的数据保存是基于CopyOnWriteArrayList,它们之间的继承关系如下:
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
public class Main {
public static void main(String[] args) {
Set<String> l1 = new CopyOnWriteArraySet<String>();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
l1.add(Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + ":" + l1);
}, "线程" + i).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# ConcurrentHashMap
它是线程安全的哈希表实现类,它将哈希表分成许多片段,每个片段除了保存哈希数据外还提供一个可重用的"互斥锁",以片段的形式实现多线程操作,同一个片段内多个线程访问是互斥的,不同片段的访问用的是异步处理方式,使得它在保证性能的前提下又可以实现数据的正确修改。它的基本结构如下:
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class Main {
public static void main(String[] args) {
Map<String,Integer> l1 = new ConcurrentHashMap<String,Integer>();
for (int i = 0; i < 20; i++) {
final int tempI=i;
new Thread(() -> {
l1.put(Thread.currentThread().getName(),tempI);
System.out.println(Thread.currentThread().getName() + ":" + l1);
}, "线程" + i).start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ConcurrentSkipListMap
跳表是一种与平衡二叉树性能类似的数据结构,主要在链表上使用:
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(20);
Map<String,Integer> l1 = new ConcurrentSkipListMap<String,Integer>();
for (int i = 0; i < 20; i++) {
final int tempI=i;
new Thread(() -> {
l1.put(Thread.currentThread().getName(),tempI);
System.out.println(Thread.currentThread().getName() + ":" + l1);
latch.countDown();
}, "线程" + i).start();
}
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(l1.get("线程5"));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# ConcurrentSkipListSet
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(20);
Set<String> l1 = new ConcurrentSkipListSet<String>();
for (int i = 0; i < 20; i++) {
final int tempI=i;
new Thread(() -> {
l1.add(Thread.currentThread().getName()+"-"+tempI);
System.out.println(Thread.currentThread().getName() + ":" + l1);
latch.countDown();
}, "线程" + i).start();
}
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(l1.contains("线程5-5"));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 阻塞队列
队列是先进先出的数据结构,可利用队列进行批量数据保存,比如在生产者消费者模型中,可以使用缓存队列来暂存生产出来的东西。这个缓存队列是不能无限制存放数据的,生产者发现队列已经达到保存上限,需暂停生产,消费者如果发现队列总没有东西,应该等有数据后再进行消费,这就需要在队列上有线程等待与唤醒机制,JUC中提供了两个新的阻塞队列接口:BlockingQueue和BlockingDeque。它的继承结构如下:
# BlockingQueue
它属于单端阻塞队列,所有数据按照FIFO算法进行保存和获取,它提供下面几个子类,ArrayBlockingQueue(数据结构),LinkedBlockingQueue(链表单端阻塞队列),PriorityBlockingQueue(优先级阻塞队列),SynchronousQueue(同步队列)。继承结构如下:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
//BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);
//链表阻塞队列 链表默认情况允许保存数据长度为Integer.MAX_VALUE。
//BlockingQueue<String> queue = new LinkedBlockingDeque<String>(5);
//优先级度列 拥有数据排序的支持
//BlockingQueue<String> queue = new PriorityBlockingQueue<String>();
//同步队列
BlockingQueue<String> queue = new SynchronousQueue<String>();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
for (int j = 0; j < 3; j++) {
try {
TimeUnit.SECONDS.sleep(2);
String tempMsgString = Thread.currentThread().getName() + ":" + j;
queue.put(tempMsgString);
System.out.println(tempMsgString+":"+queue.size());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, "生产者 "+i+":").start();
}
for(int i=0;i<3;i++) {
new Thread(()->{
while(true) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+" 取数据:"+queue.take());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
},"消费者 "+i+":").start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# BlockingDeque
BlockingDeque为双端阻塞队列,可实现FIFO与FILO操作,只有LinkedBlockingDeque队列。继承结构如下:
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
BlockingDeque<String> deque = new LinkedBlockingDeque<String>(5);
for (int i = 0; i < 100; i++) {
new Thread(() -> {
for (int j = 0; j < 3; j++) {
try {
TimeUnit.SECONDS.sleep(2);
String tempMsgString = Thread.currentThread().getName() + ":" + j;
deque.putFirst(tempMsgString);
System.out.println(tempMsgString+":"+deque.size());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}, "生产者 "+i+":").start();
}
for(int i=0;i<3;i++) {
new Thread(()->{
while(true) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+" 取数据:"+deque.takeLast());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
},"消费者 "+i+":").start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 延迟队列
JUC中提供自动弹出数据的延迟队列DelayQueue,该类属于BlockingQueue接口子类,对于延迟操作的计算通过Delayed接口进行计算,它的组成结构如下:
下面是使用延迟队列来实现一个简单缓存系统:
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
class ResourceItem<T> implements Delayed{
private long expire;
private long delay;
private T itemObj;
public ResourceItem(T obj,long delay,TimeUnit unit) {
this.itemObj=obj;
this.delay=TimeUnit.MILLISECONDS.convert(delay, unit);
this.expire = System.currentTimeMillis()+this.delay;
}
@Override
public int compareTo(Delayed o) {
// TODO Auto-generated method stub
//return (int)(this.delay-this.getDelay(TimeUnit.MILLISECONDS));
return (int)(this.delay-o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
// TODO Auto-generated method stub
//System.out.println("get delayed:"+unit.convert(this.expire-System.currentTimeMillis(), unit));
return unit.convert(this.expire-System.currentTimeMillis(), unit);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return this.itemObj.toString();
}
public T getItem() {
return this.itemObj;
}
}
class Cache<K,V>{
class Pair{
private K keyK;
private V valueV;
public Pair(K key,V value) {
this.keyK=key;
this.valueV=value;
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "key:"+keyK+" value:"+valueV;
}
}
private Map<K, V> cacheObjects = new ConcurrentHashMap<K, V>();
private BlockingQueue<ResourceItem<Pair>> queue = new DelayQueue<ResourceItem<Pair>>();
public Cache() {
Thread thread = new Thread(()->{
while(true) {
try {
ResourceItem<Pair> resourceItem;
resourceItem = queue.take();
if(resourceItem != null) {
Pair pairItem = resourceItem.getItem();
cacheObjects.remove(pairItem.keyK,pairItem.valueV);
System.out.println("removed item,key:"+pairItem.keyK+" value:"+pairItem.valueV);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
thread.setDaemon(true);
thread.start();
}
public V getValue(K key) {
return this.cacheObjects.get(key);
}
public void putValue(K key,V value,long delayTime) throws InterruptedException {
V oldValue = this.cacheObjects.put(key, value);
if(oldValue != null) {
queue.remove(oldValue);
}
this.queue.put(new ResourceItem<Pair>(new Pair(key,value),delayTime,TimeUnit.SECONDS));
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
Cache<Integer,String> MyCache = new Cache<Integer,String>();
MyCache.putValue(1, "li",2);
MyCache.putValue(2, "wang",3);
MyCache.putValue(3, "xun",5);
System.out.println(MyCache.getValue(1));
TimeUnit.SECONDS.sleep(5);
System.out.println(MyCache.getValue(2));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# 线程池
多线程提升了程序执行性能,但是也会带来线程资源调度的损耗(线程的创建和回收),会导致程序的响应速度变慢,为了实现合理的线程操作,需要提高线程的可管理性,降低资源损耗,在JUC中提供了线程池的概念,可通过java.util.concurrent.Executors类完成。
Executors通过添加线程池管理来定义了运行和管理线程的高层API以支持大规模应用,在Java 7中executors的类图如下:
# Executor和Thread区别
- 使用Thread性能差,线程缺乏统一管理,容易因为死锁或者占用过多资源导致死机。
- 使用线程池重用已有的线程,减少性能开销,有效控制最大并发线程数,提高系统资源使用率,除此之外还能提供定时执行,单线程,并发数等控制。
# Executor和ExecutorService和Executors区别
- Executor定义了execute()方法来接收Runnable接口对象,且不返回任何结果。
- ExecutorService接口继承了Executor接口,ExecutorService中的submit()可接收Runnable和Callable接口对象,可返回一个Future对象。它还能使用控制线程池的方法,比如shutDown()。
- Executors提供工厂方法来创建不同类型的线程池。它可创建四种线程池:
- 缓存线程池(CachedThreadPool):线程池中的每个子线程可被重用,保存了所有的用户线程,随着处理量的增加可以持续进行用户线程的创建。
- 固定大小线程池(FixedThreadPool):保存所有的内核线程,它们被不断重用,并不保留任何用户线程。
- 单线程池(SingleThreadPool):只维护一个内核线程,所有执行者依据顺序排队获取线程资源。
- 定时调度池(ScheduledThreadPool):按照计划周期性地完成线程中的任务,包含内核线程和用户线程,可提供许多用户线程。
# 用户线程和内核线程
多线程的实现过程本身依赖于操作系统,JDK最初只有单核CPU,当时的多线程依赖于软件平台的实现,用户线程和内核线程区别如下:
- 用户线程可以在不支持多线程的系统中存在;内核线程需要操作系统与硬件的支持。
- 在只有用户线程的系统中,CPU依然以进程为单位,处于进程中的多个线程是通过程序来实现的;在有内核支持的多线程系统中,CPU调度以线程为单位,由操作系统调度。
- 用户线程通过进程划分,一个进程系统只会为其分配一个处理器,所以用户线程无法调用系统的多核处理器;内核线程可以调度一个程序在多核处理器上执行,提高处理性能。
- 用户线程执行系统指令调用时导致其所属进程被中断;内核线程执行系统指令调用时,只导致该线程被中断。
# 创建线程池
Executors类创建线程池需要两类接口描述:ExecutorService(线程池)与ScheduledExecutorService(调度线程池),它们的继承关系如下:
由于线程池接口之间有继承关系,所有的线程池实现都是需要为其提供一系列用户线程后才可进行统一的执行调度。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) throws InterruptedException {
//创建缓存线程池,线程数量最多不能超过Integer.MAX_VALUE
//ExecutorService service = Executors.newCachedThreadPool();
//固定长度线程池
ExecutorService service = Executors.newFixedThreadPool(3);
//单线程池
//ExecutorService service = Executors.newSingleThreadExecutor();
for(int i=0;i<100;i++) {
service.submit(()->{
System.out.println(Thread.currentThread().getId()+"::::"+Thread.currentThread().getName());
});
}
service.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
通过Executors获取的线程实际上都是Executor接口的实例,在Executor接口中提供的execute()方法可进行多线程保存,本例使用submit方法,该方法追加了执行任务是否为空的判断,它会调用execute方法。
线程池中有三个核心概念:
- task:表示真正执行的任务,所有线程任务追加后不会立刻执行。
- worker:所有线程池中的任务都需要通过worker来执行,worker数量受到线程池容量限制。
- reject:拒绝策略,如果线程池中的线程满了,可以选择离开或等待。
下面是线程调度池:
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executeService = Executors.newScheduledThreadPool(3);
for(int i=0;i<10;i++) {
int tempI=i;
executeService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getName()+" i="+tempI);
}
}, 3, 2, TimeUnit.SECONDS);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
传入Callable接口实例,利用Future接口获取线程的返回结果。
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Set<Callable<String>> threads = new HashSet<Callable<String>>();
for(int i=0;i<10;i++) {
int tempI=i;
threads.add(()->{
return Thread.currentThread().getId()+"::"+Thread.currentThread().getName()+" is "+tempI;
});
ExecutorService service = Executors.newFixedThreadPool(3);
List<Future<String>> results = service.invokeAll(threads);
for(Future<String> future:results) {
System.out.println(future.get());
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# CompletionService
它是一个异步处理模式,其能异步获取线程池的返回结果。它将Executor(线程池)和BlockingQueue(阻塞队列)结合在一起,同时主要使用Callable定义线程任务,这个过程中生产者不断地将Callable线程任务保存进阻塞队列,线程池作为消费者不断地把其中的任务取出,返回结果。CompletionService的实现结构如下:
CompletionService接口可以接收Callable或Runnable实现的线程任务,并且可以通过ExecutorCompletionService子类实例化接口对象:
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class MyThread implements Callable<String>{
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
long currentTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName()+" started!");
return Thread.currentThread().getName()+":"+(System.currentTimeMillis()-currentTime);
}
}
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService service = Executors.newCachedThreadPool();
CompletionService<String> completionService = new ExecutorCompletionService<String>(service);
for(int i=0;i<10;i++) {
completionService.submit(new MyThread());
}
for(int i=0;i<10;i++) {
System.out.println("得到数据:"+completionService.take().get());
}
service.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# ThreadPoolExecutor
通过Executors类可实现线程池的创建,通过Executors类创建的所有线程池都是基于ThreadPoolExecutor类实现创建,在一些特殊环境下开发者可以直接利用ThreadPoolExecutor类结合阻塞队列与拒绝策略创建属于自己的线程池。其中Executors创建线程池的源码如下:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
}
2
3
4
5
6
7
线程池中的资源是有限的,对超出线程池容量的部分任务线程将拒绝其执行,有如下四种拒绝策略,通过ThreadPoolExecutor的内部类形式定义:
- ThreadPoolExecutor.AbortPolicy(默认):在任务添加到线程池中被拒绝的时候,会抛出RejectedExecutionException异常。
- ThreadPoolExecutor.CallerRunsPolicy:当任务添加到线程池中被拒绝的时候,会在线程池当前正在执行线程的worker里找一个来处理此线程。
- ThreadPoolExecutor.DiscardOldestPolicy:当任务添加到线程池中被拒绝的时候,线程池会放弃队列中等待最长时间的任务,并将被拒绝的任务添加到队列中。
- ThreadPoolExecutor.DiscardPolicy:当任务添加到线程池中被拒绝的时候,直接丢弃。
它们的类组成如下:
来看实例:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(3);
ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(2, 3, 3, TimeUnit.SECONDS,queue,Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<10;i++) {
myExecutor.submit(()->{
System.out.println("begin name:"+Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("end name:"+Thread.currentThread().getName());
});
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26