0%

ConCurrentHashMap

ConCurrentHashMap

ConCurrentHashMap底层是:散列表 + 红黑树,与HashMap一样

根据顶部的注释,大概意思为

支持高并发的检索和更新

线城市安全的,并且检索操作是不用加锁的

get方法非阻塞

检索出来结果是最新设置的值

一些关于同届的方法,最好是在单线程的环境下使用,不然它只满足监控或估算的目的,在项目中(多环境下)使用它是无法准确返回的

当有太多的散列碰撞,该表会动态增长

再散列(扩容)是一件非常耗费资源的操作,最好是提前计算放入容器中有多少的元素来手动初始化装载因子和初始容量

当很多的key的HashCode相等时会非常影响性能的(散列冲突),key实现Comparable接口(直接定义比较key),会好一点

实现了Map和Iterator的所有方法

ConCurrentHashMap不允许key或value为null

ConCurrentHashMap提供的方法支持批量操作

Hashtable与ConCurrentHashMap区别

  • Hashtable是在每个方法上都加了Synchronized完成同步,效率低下
  • ConCurrentHashMap通过在部分加锁和利用CAS算法来实现同步

CAS算法和volatile

CAS(比较与交换,Compare and swap)是一种有名的无锁算法

CAS有3个操作数

  • 内存值V
  • 旧的预期值A
  • 要修改的新值B

当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做

  • 当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值(A和内存值V相同时,将内存值B修改为B),而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败了,并可以再次尝试(否则什么都不做)

volatile经典总结:volatile仅仅用来保证该变量对所有线程的可见性,但不保证原子性

即:

  • 保证该变量对所有现成的可见性

    • 在多线程的环境下:当这个变量修改时,所有的线程都会知道该变量被修改了
  • 不保证原子性

    • 修改变量(赋值)实质上实在JVM中分了好几步,而在这几步内(从装载变量到修改),它是不安全的

可以参考以下博客

为什么volatile不能保证原子性而Atomic可以

非阻塞同步算法与CAS


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//散列表,迭代器就是迭代它
transient volatile Node<K,V>[] table;

//下一张表要使用,除了在扩容的时候,其余时间它都为null
private transient volatile Node<K,V>[] nextTable;

//基础计数器,通过CAS来更新
private transient volatile long baseCount;

//散列表初始化和扩容都是由这个变量来控制
//当为负数时,它正在被初始化或者在扩容(-1表示正在初始化,-N表示N-1个线程在进行扩容
//默认是0,初始化之后,保存着下一次扩容的大小
private transient volatile int sizeCtl;

//分割表时用的索引值
private transient volatile int transferIndex;

//与计算size相关
private transient volatile int cellsBusy;

//同上
private transient volatile CounterCell[] counterCells;

//视图
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//默认初始容量为16
public ConcurrentHashMap() {
}

//给定初始容量,这样就不用过度依赖动态扩容了
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, LOAD_FACTOR, 1);
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

//估计并发更新的线程数量
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
//赋值给sizeCtl属性:这是下次扩容的大小
this.sizeCtl = cap;
}

//获取参数且最接近2的整次幂
private static final int tableSizeFor(int c) {
int n = -1 >>> Integer.numberOfLeadingZeros(c - 1);
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
   public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//对key进行散列,获取哈希值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
//当表为null时,进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果这个哈希值可以直接存到数组,就直接插入进去(不用加锁)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
//插入的位置是表的链接点时,就表明在扩容,帮助当前线程扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
//处理散列冲突
else {
V oldVal = null;
//加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
//节点的方式处理
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果涉及到相同的key进行put操作时会覆盖原来的value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//在链表尾部插入节点
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
//按照树的方式插入
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//将节点插入红黑树,涉及红黑树节点的旋转操作
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
//链表长度大于8,把链表结构转成树形结构
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//统计size,并检查是否需要扩容
addCount(1L, binCount);
return null;
}

//!只让一个线程对列表进行初始化!
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//有线程正在初始化,告诉其它线程不要进来了
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//设置为-1,说明本线程正在初始化
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//相当于0.75*n设置一个扩容的阈值
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

get

不用加锁,是非阻塞的

Node节点是重写的,设置了volatile关键字修饰,致使它每次获取的都是最新设置的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果在桶子上,就直接获取
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//在树形结构上
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
//在链表上
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

总结

  • 在高并发环境下,统计数据(计算size等)是无意义的,因为在下一时刻size值就发生变化了
  • get方法是非阻塞,无锁的。重写Node类,通过volatile修饰next来实现每次获取都是最新设置的值
  • ConcurrentHashMap的key和Value都不能为null

ConcurrentHashMap原理分析