ConcurrentHashMap
是 Java 中的一个线程安全的哈希表实现,它可以在多线程环境下提供高效的并发读写操作。ConcurrentHashMap
继承自 AbstractMap
类,实现了 Map
接口,同时还实现了 java.util.concurrent.ConcurrentMap
接口,提供了更多的并发操作方法。
ConcurrentHashMap
的特点:
- 线程安全:
ConcurrentHashMap
是线程安全的,多个线程可以同时对它进行读写操作,而不会出现数据不一致的问题。 - 高效性:
ConcurrentHashMap
在多线程环境下提供了高效的并发读写操作,能够充分利用多核 CPU 的优势。 - 分段锁:
ConcurrentHashMap
使用分段锁(Segment)来保证线程安全,将整个哈希表分成多个小的哈希表,每个小的哈希表都有一个独立的锁,不同的线程可以同时访问不同的小哈希表,从而实现了并发访问。 - 高并发性:
ConcurrentHashMap
在高并发情况下表现良好,能够支持大量的并发读写操作。 - 可扩展性:
ConcurrentHashMap
支持动态扩容,可以根据需要自动调整大小,从而提高存储容量和并发性能。
总之,ConcurrentHashMap
是一个线程安全、高效、可扩展的哈希表实现,适用于多线程环境下的高并发读写操作。
注意:JDK1.8之后取消了分段锁的实现方式,改用了一种基于 CAS (Compare and Swap) 操作和链表/红黑树的实现方式。具体来说,ConcurrentHashMap 的每个桶(Bucket)都是一个 Node 数组,每个 Node 包含了一个 key-value 对以及一个 next 指针,用来指向下一个 Node。
为什么要引入
红黑树呢?在正常情况下,key hash 之后如果能够很均匀的分散在数组中,那么 table 数
组中的每个队列的长度主要为 0 或者 1.但是实际情况下,还是会存在一些队列长度过长的
情况。如果还采用单向列表方式,那么查询某个节点的时间复杂度就变为 O(n); 因此对于
队列长度超过 8 的列表,JDK1.8 采用了红黑树的结构,那么查询的时间复杂度就会降低到
O(logN),可以提升查找的性能;
如下图所示JDK1.7之前
JDK1.8之后:
ConcurrentHashMap的使用
就和HashMap
一样的使用只不过ConcurrentHashMap
底层帮你实现了线程安全
创建 ConcurrentHashMap 对象
可以使用无参构造函数来创建 ConcurrentHashMap 对象:
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
添加元素
ConcurrentHashMap 的 put() 方法可以用来添加元素,它是线程安全的。例如:
map.put("apple", 1);
map.put("banana", 2);
map.put("orange", 3);
获取元素
ConcurrentHashMap 的 get() 方法可以用来获取元素,它也是线程安全的。例如:
Integer value = map.get("apple");
删除元素
ConcurrentHashMap 的 remove() 方法可以用来删除元素,它也是线程安全的。例如:
map.remove("banana");
遍历元素
ConcurrentHashMap 可以使用 forEach() 方法来遍历元素。例如:
map.forEach((key, value) -> {
System.out.println(key + " : " + value);
});
源码分析
put方法
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//计算 hash 值
int hash = spread(key.hashCode());
//用来记录链表的长度
int binCount = 0;
//这里其实就是自旋操作,当出现线程竞争时不断自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
这个是整块的代码有点长分开看
第一个阶段Map是null的情况
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//如果数组为空,则进行数组初始化
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//通过 hash 值对应的数组下标得到第一个节点; 以 volatile 读的方式来读取
table 数组中的元素,保证每次拿到的数据都是最新的
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
如果此时这个位置是null直接通过cas赋值返回
break; // no lock when adding to empty bin
}
这里还涉及几个方法
initTable
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl如果等于-1则说明正在初始化
if ((sc = sizeCtl) < 0)
//所以这里会交出CPU时间片继续竞争CPU竞争到了继续判断是否还在初始化
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//如果不是在初始化这个时候就会将sizeCtl值修改成-1表示我们需要进行初始化操作
try {
if ((tab = table) == null || tab.length == 0) {
//这个就是初始化的默认大小就是16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算下次扩容的大小,实际就是当前容量的 0.75倍,这里使用了右移来计算
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
数组初始化方法,这个方法比较简单,就是初始化一个合适大小的数组
tabAt
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
这个也比较简单就是以 volatile 读的方式来读取
table 数组中的元素,保证每次拿到的数据都是最新的
casTabAt
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
通过CAS操作来改变Tab中的元素保证线程安全
注:((long)i << ASHIFT) + ABASE就是算出这个元素的内存地址
第二个阶段addCount
添加完元素之后此时需要将map长度计数加一,所以会调用addCount方法
这里需要特别说明一下这里的计数用了两种方式一个就是使用一个int变量计数还一种方式是使用CounterCell链表计数
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//判断 counterCells 是否为空,
//1.如果为空,就通过 cas 操作尝试修改 baseCount 变量,对这个变量进行原子
累加操作(做这个操作的意义是:如果在没有竞争的情况下,仍然采用 baseCount来记录元素个数
//2.如果 cas 失败说明存在竞争,这个时候不能再采用 baseCount 来累加,而是通过CounterCell 来记录
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
//是否冲突标识,默认为没有冲突
boolean uncontended = true;
/*
这里有几个判断
1. 计数表为空则直接调用 fullAddCount
2. 从计数表中随机取出一个数组的位置为空,直接调用 fullAddCount
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//通过 CAS 修改 CounterCell 随机位置的值,如果修改失败说明出现并发情况(这里又
//用到了一种巧妙的方法),调用 fullAndCount
//Random 在线程并发的时候会有性能问题以及可能会产生相同的随机
//数,ThreadLocalRandom.getProbe 可以解决这个问题,并且性能要比 Random 高
//如果没有修改成功则说明出现并发情况则调用fullAddCount
fullAddCount(x, uncontended);
return;
}
//链表长度小于等于 1,不需要考虑扩容
if (check <= 1)
return;
//统计 ConcurrentHashMap 元素个数
s = sumCount();
}
//这里则说明需要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
这里可能有人会有疑问累加计数不是一个很简单的操作的为什么那么复杂,原因就在于这个Map是线程安全,为了保证线程安全,那我们的计数肯定也要线程安全,所以每次在改变这计数的时候是需要加锁的,但是呢每次如果这里加锁了那么上面的操作不就白费了吗不管你怎么分段加锁,走到这里还是要等待,只能一个一个来,极大的降低了效率。 所以这里就用了分片的方法来记录大小,也就是还会有个链表,每次加操作过来都会哈希一个位置记录当前的数字或者直接累加,当需要总技术的时候只需要链表所有值累加即可,这个链表也类似于上文中的给每个桶子加锁就可以了,这样既保证了线程安全,从而也提升了效率
这里我们可以看一下CounterCell结构
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
CounterCell 数组的每个元素,都存储一个元素个数,而实际我们调用size 方法就是通过这个循环累加来得到的
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
fullAddCount方法
fullAddCount 主要是用来初始化 CounterCell,来记录元素个数,里面包含扩容,初始化等
操作
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//获取当前线程的 probe 的值,如果值为 0,则初始化当前线程的 probe 的值,probe 就是随机数
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
//说明 counterCells 已经被初始化过了,
if ((a = as[(n - 1) & h]) == null) {
// 通过该值与当前线程 probe 求与,获得cells 的下标元素,和 hash 表获取索引是一样的
if (cellsBusy == 0) { //这里是判断 CounterCell是否正在初始化
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&//通过 cas 设置 cellsBusy 标识,
防止其他线程来对 counterCells 并发处理
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j
//将初始化的 r 对象的元素个数放在对应下标的位置
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
//恢复标志位
cellsBusy = 0;
}
if (created)
//创建成功,退出循环
break;
//说明指定 cells 下标位置的数据不为空,则进行下一次循环
continue; // Slot is now non-empty
}
}
collide = false;
}
//说明在 addCount 方法中 cas 失败了,并且获取 probe 的值不为空
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
//由于指定下标位置的 cell 值不为空,则直接通过 cas 进行原子累加,如果成功,则直接退出
break;
else if (counterCells != as || n >= NCPU)
collide = false; ///设置当前线程的循环失败不进行扩容
else if (!collide) //恢复 collide 状态,标识下次循环会进行扩容
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//进入这个步骤,说明 CounterCell 数组容量不够,线程竞争较大,
所以先设置一个标识 表示为正在扩容
try {
if (counterCells == as) {// Expand table unless stale
//扩容一倍 2 变成 4,这个扩容比较简单
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//cellsBusy=0 表示没有在做初始化,通过 cas 更新 cellsbusy
的值标注当前线程正在做初始化操作
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
//竞争激烈,其它线程占据 cell 数组,直接累加在 base 变量中
break; // Fall back on using base
}
}
addCount原理图
第三个阶段
//这里代表正在扩容
else if ((fh = f.hash) == MOVED)
//其他线程就帮助扩容
tab = helpTransfer(tab, f);
第四个阶段
这里主要是需要添加的结点已经有值的时候说明需要以链表的方式添加进来
else {//进入到这个分支,说明 f 是当前 nodes 数组对应位置节点的头节点,并且不为null
V oldVal = null;
synchronized (f) {
//给对应的头结点加锁
if (tabAt(tab, i) == f) {
//再次判断对应下标位置是否为 f 节点
if (fh >= 0) {
//头结点的 hash 值大于 0,说明是链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果发现相同的 key,则判断是否需要进行值的覆盖
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
////一直遍历到链表的最末端,直接把新的值加入到链表的最后面
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果当前的 f 节点是一颗红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//调用红黑树的插入方法插入新的值
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//判断链表的长度是否已经达到临界值 8. 如果达到了临界值
,这个时候会根据当前数组的长度 来决定是扩容还是将链表转化为红黑树。
也就是说如果当前数组的长度小于 64,就会先扩容。 否则,
会把当前链表转化为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
至此put方法就完成了
最终形成的结构如下所示
get方法
这个方法的源码就比较简单了
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//获取哈希数
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//找到对应结点后判断key是否一致
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
//一致的话直接返回
return e.val;
}
//不一致的话则从链表中查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}