简单了解并发集合
# ConcurrentHashMap
可以理解成线程安全的 HashMap,在 Java7 与 Java8 中的实现不一样。在使用的时候只需要调用 Maps.newConcurrentMap 即可构建一个 ConcurrentHashMap
# 1.7实现
一图解释1.7实现:
ConcurrentHashMap 存入数据时,会计算要 put 的 key 的位置,获取指定位置的 Segment。并且对 Segment 加锁,这么做只是避免了对整个集合加锁
# 结构
1.7之前的 ConcurrnetHashMap 由多个 Segment 组成,每一个 Segment 都类似 HashMap
Segment 的个数一旦初始化就不能改变,每一个 Segment 同时只会被一个线程修改,Segment 的个数决定了有多少个线程可以并发执行
当对 Segment 中的数据进行修改时,需要先获得锁,而 Segment 继承自可重入锁,获取锁只需要调用 tryLock 方法
也就是说,1.7的 ConcurrnetHashMap 只是多个可以上锁的 hashMap 拼起来的而已
# 初始化
在实例化对象的时候,会初始化 segments[0],默认大小为 2,负载因子 0.75,扩容阀值是 2*0.75=1.5,插入第二个值时才会进行扩容
以下是初始化源码
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
// 参数校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 记录段偏移量
this.segmentShift = 32 - sshift;
// 记录段掩码
this.segmentMask = ssize - 1;
// 设置容量
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
上面的主要流程是:
1,参数校验 2,如果 concurrencyLevel 大于最大值,Segment 数目设为为最大值。无参构造默认值是16;否则寻找 concurrencyLevel 之上最近的2的幂次方值,作为初始化容量大小 3,记录 segmentShift 偏移量,这个值为【容量 = 2 的N次方】中的 N,在后面 Put 时计算位置时会用到。默认是 32 - sshift = 28
记录 segmentMask,默认是 ssize - 1 = 16 -1 = 15 初始化 segments[0],默认大小为 2,负载因子 0.75,扩容阀值是 2*0.75=1.5,插入第二个值时才会进行扩容
在 put 方法中,当其他的 segment 被需要时,使用 Segment[0] 的容量和负载因子创建一个新的 HashEntry 数组
由于可能有多个线程对这个集合进行操作,在复制时会多次判断这个 segment 是否为 null,并用 CAS 来进行操作
# put 方法
由 hash 函数计算出这个键值对属于哪一个 segment,hash 函数与 HashMap 中的不同,segmentShift 是偏移量。Segment 继承了 ReentrantLock,所以直接对自己上锁即可
int j = (hash >>> segmentShift) & segmentMask;
计算出数据属于哪一个 segment 之后有两种情况, segment 没有初始化和已经初始化的情况:
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
s = ensureSegment(j);
return s.put(key, hash, value, false);
2
3
4
没有初始化,执行 ensureSegment 方法。它也只干了三件事:
1,检查计算得到的位置的 Segment 是否为 null.,为 null 继续初始化,不然的话使用Segment[0]的容量和负载因子创建一个HashEntry数组 2,再次检查计算得到的指定位置的Segment是否为null,使用创建的HashEntry数组初始化这个Segment 3,自旋判断计算得到的指定位置的Segment是否为null,使用CAS在这个位置赋值为Segment
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE;
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0];
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
已经初始化:
tryLock() 获取锁,获取不到使用 scanAndLockForPut 方法(这个方法中自旋调用 tryLock 方法,当自旋次数大于指定次数时,使用 lock() 阻塞获取锁)继续获取。后续的步骤和 hashmap 基本类似,不再说明
# 扩容
和 hashmap 一样,大于扩容阈值后开始扩容,不过只能在 segment 内部进行扩容,segment 的数量是不能改变的
ConcurrentHashMap 的扩容只会扩容到原来的两倍。老数组里的数据移动到新的数组时,位置要么不变,要么变为 index+oldSize,参数里的 node 会在扩容之后使用链表头插法插入到指定位置
为什么位置要么不变,要么变为 index+oldSize?
因为扩容后哈希函数多了一位比特,这位要么是1要么是0,而这对应的就是 index 的有无
# 1.8实现
# 结构
1.7 中,每个数据存放和取出都需要计算两次位置(查找段一次,查找 hashMap 一次),我们在1.8中优化了这个问题
现在的 ConcurrnetHashMap 和普通的 HashMap 结构差不多,是 Node 数组和链表加红黑树,这样就少了一次 hash 值的计算。还有一个优化就是会将 node 数组扩容

# put
ConcurrentHashMap 的初始化是通过自旋和 CAS 操作完成的
使用 hash 函数计算出位置后:
1,如果当前位置的 Node 数组没有被初始化,使用自旋和 CAS 来构造这个 Node 数组 2,如果 Node 数组为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则进行重试,这里不会上悲观锁 3,如果当前位置的 MOVED == -1(扩容标识),则需要进行扩容 5,如果不需要扩容则会用 synchronized 锁写入数据,synchronized 只锁定当前链表或红黑二叉树的首节点,这样只要 hash 不冲突,就不会产生并发 4,若检测到其他线程正在扩容(节点 hash 值为 MOVED),当前线程会协助迁移数据(helpTransfer),加速扩容过程。具体方式是每个线程负责一个区间的节点迁移,提升扩容效率。
扩容:
1,和 hashmap 一样,大于扩容阈值后开始扩容 2,同 HashMap 一样,链表也会在长度达到 8 的时候转化为红黑树,这样可以提升大量冲突时候的查询效率 3,ConcurrentHashMap 的扩容会将 node 数组扩容
# HashTable 和 Collections.synchronizedMap
线程安全的 HashMap,大多数方法用 synchronized 标记实现同步。HashTable 是较为远古的使用 Hash 算法的容器结构了,现在基本已被淘汰,单线程转为使用 HashMap,多线程使用 ConcurrentHashMap
在 java17 中,你甚至都找不到它
我们在使用的时候可以直接调用 Collections.synchronizedMap 来创建一个线程安全的 HashMap,他的实现原理和 HashTable 基本类似
# CopyOnWriteArrayList
线程安全的 ArrayList
读操作远远大于写操作使用,因为这个结构对读操作不上锁,并对所有可变操作创建新数组并修改新数组来实现,在修改后直接将引用指向修改完的数组。所有可变操作都会上锁以保证操作的原子性
# Vector 和 Collections.synchronizedList
线程安全的 ArrayList,因为里面装的是对象数组
protected Object[] elementData;
使用 synchronized 实现同步
一般在项目中不直接使用 Vector 了,我们现在使用 Collections.synchronizedList 来提供并发 list,它的同步原理和 Vector 类似:
public static <T> List<T> synchronizedList(List<T> list) {
return (list instanceof RandomAccess ?
new SynchronizedRandomAccessList<>(list) :
new SynchronizedList<>(list));
}
2
3
4
5
SynchronizedList 的部分源码,可以看到他也使用 synchronized 做同步

# 阻塞队列
这个接口叫 BolckingQueue,其下所有的实现类都是实现一定同步功能的。阻塞队列的意思是,该队列在被清空的时候自动阻塞拿取元素的方法,队列在满的时候自动阻塞存放元素的方法,优点是我们不用去关心底层的同步问题,可以用于解决生产者消费者
下面是一些阻塞队列的常见实现类:
ArrayBlockingQueue
LinkedBlockingQueue
LinkedBlockingDeque
PriorityBlockingQueue
2
3
4