侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

concurrenthashmap为什么是线程安全

技术  /  管理员 发布于 7年前   165

一、HashMap和ConcurrentHashMap的对比

我们用一段代码证明下HashMap的线程不安全,以及ConcurrentHashMap的线程安全性。代码逻辑很简单,开启10000个线程,每个线程做很简单的操作,就是put一个key,然后删除一个key,理论上线程安全的情况下,最后map的size()肯定为0。

推荐教程:《java学习》

Map<Object,Object> myMap=new HashMap<>();        // ConcurrentHashMap myMap=new ConcurrentHashMap();        for (int i = 0; i <10000 ; i++) {new Thread(new Runnable() {    @Override    public void run() {double a=Math.random();myMap.put(a,a);myMap.remove(a);    }}).start();        }        Thread.sleep(15000l);//多休眠下,保证上面的线程操作完毕。        System.out.println(myMap.size());

这里显示Map的size=13,但是实际上map里还有一个key。 同样的代码我们用ConcurrentHashMap来运行下,结果map ==0

这里也就证明了ConcurrentHashMap是线程安全的,我们接下来从源码分析下ConcurrentHashMap是如何保证线程安全的,本次源码jdk版本为1.8。

二、ConcurrentHashMap源码分析

3.1 ConcurrentHashMap的基础属性

//默认最大的容量 private static final int MAXIMUM_CAPACITY = 1 << 30;//默认初始化的容量private static final int DEFAULT_CAPACITY = 16;//最大的数组可能长度static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;//默认的并发级别,目前并没有用,只是为了保持兼容性private static final int DEFAULT_CONCURRENCY_LEVEL = 16;//和hashMap一样,负载因子private static final float LOAD_FACTOR = 0.75f;//和HashMap一样,链表转换为红黑树的阈值,默认是8static final int TREEIFY_THRESHOLD = 8;//红黑树转换链表的阀值,默认是6static final int UNTREEIFY_THRESHOLD = 6;//进行链表转换最少需要的数组长度,如果没有达到这个数字,只能进行扩容static final int MIN_TREEIFY_CAPACITY = 64;//table扩容时, 每个线程最少迁移table的槽位个数private static final int MIN_TRANSFER_STRIDE = 16;//感觉是用来计算偏移量和线程数量的标记private static int RESIZE_STAMP_BITS = 16;//能够调整的最大线程数量private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;//记录偏移量private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;//值为-1, 当Node.hash为MOVED时, 代表着table正在扩容static final int MOVED     = -1;//TREEBIN, 置为-2, 代表此元素后接红黑树static final int TREEBIN   = -2;//感觉是占位符,目前没看出来明显的作用static final int RESERVED  = -3;//主要用来计算Hash值的static final int HASH_BITS = 0x7fffffff; //节点数组transient volatile Node<K,V>[] table;//table迁移过程临时变量, 在迁移过程中将元素全部迁移到nextTable上private transient volatile Node<K,V>[] nextTable;//基础计数器private transient volatile long baseCount;//table扩容和初始化的标记,不同的值代表不同的含义,默认为0,表示未初始化//-1: table正在初始化;小于-1,表示table正在扩容;大于0,表示初始化完成后下次扩容的大小private transient volatile int sizeCtl;//table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标private transient volatile int transferIndex;//扩容时候,CAS锁标记private transient volatile int cellsBusy;//计数器表,大小是2次幂private transient volatile CounterCell[] counterCells;

上面就是ConcurrentHashMap的基本属性,我们大部分和HashMap一样,只是增加了部分属性,后面我们来分析增加的部分属性是起到如何的作用的。

2.2 ConcurrentHashMap的常用方法属性

put方法

final V putVal(K key, V value, boolean onlyIfAbsent) {        //key和value不允许为null        if (key == null || value == null) throw new NullPointerException();        //计算hash值        int hash = spread(key.hashCode());        int binCount = 0;        for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;//如果table没有初始化,进行初始化if (tab == null || (n = tab.length) == 0)    tab = initTable();//计算数组的位置    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    //如果该位置为空,构造新节点添加即可    if (casTabAt(tab, i, null,     new Node<K,V>(hash, key, value, null)))        break;       // no lock when adding to empty bin}//如果正在扩容else if ((fh = f.hash) == MOVED)//帮着一起扩容    tab = helpTransfer(tab, f);else {//开始真正的插入    V oldVal = null;    synchronized (f) {        if (tabAt(tab, i) == f) {        //如果已经初始化完成了if (fh >= 0) {    binCount = 1;    for (Node<K,V> e = f;; ++binCount) {        K ek;        //这里key相同直接覆盖原来的节点        if (e.hash == hash &&((ek = e.key) == key || (ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)    e.val = value;break;        }        Node<K,V> pred = e;        //否则添加到节点的最后面        if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,  value, null);break;        }    }}//如果节点是树节点,就进行树节点添加操作else if (f instanceof TreeBin) {    Node<K,V> p;    binCount = 2;    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,alue)) != null) {        oldVal = p.val;        if (!onlyIfAbsent)p.val = value;    }}        }    }//判断节点是否要转换成红黑树    if (binCount != 0) {        if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);//红黑树转换        if (oldVal != null)return oldVal;        break;    }}        }        //计数器,采用CAS计算size大小,并且检查是否需要扩容        addCount(1L, binCount);        return null;    }

我们发现ConcurrentHashMap的put方法和HashMap的逻辑相差不大,主要是新增了线程安全部分,在添加元素时候,采用synchronized来保证线程安全,然后计算size的时候采用CAS操作进行计算。整个put流程比较简单,总结下就是:

1.判断key和vaule是否为空,如果为空,直接抛出异常。

2.判断table数组是否已经初始化完毕,如果没有初始化,进行初始化。

3.计算key的hash值,如果该位置为空,直接构造节点放入。

4.如果table正在扩容,进入帮助扩容方法。

5.最后开启同步锁,进行插入操作,如果开启了覆盖选项,直接覆盖,否则,构造节点添加到尾部,如果节点数超过红黑树阈值,进行红黑树转换。如果当前节点是树节点,进行树插入操作。

6.最后统计size大小,并计算是否需要扩容。

get方法

public V get(Object key) {        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;        //计算hash值        int h = spread(key.hashCode());        //如果table已经初始化,并且计算hash值的索引位置node不为空        if ((tab = table) != null && (n = tab.length) > 0 &&(e = tabAt(tab, (n - 1) & h)) != null) {//如果hash相等,key相等,直接返回该节点的valueif ((eh = e.hash) == h) {    if ((ek = e.key) == key || (ek != null && key.equals(ek)))        return e.val;}//如果hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到节点。else if (eh < 0)    return (p = e.find(h, key)) != null ? p.val : null;//循环遍历链表,查询key和hash值相等的节点。while ((e = e.next) != null) {    if (e.hash == h &&        ((ek = e.key) == key || (ek != null && key.equals(ek))))        return e.val;}        }        return null;    }

get方法比较简单,主要流程如下:

1.直接计算hash值,查找的节点如果key和hash值相等,直接返回该节点的value就行。

2.如果table正在扩容,就调用ForwardingNode的find方法查找节点。

3.如果没有扩容,直接循环遍历链表,查找到key和hash值一样的节点值即可。

ConcurrentHashMap的扩容

ConcurrentHashMap的扩容相对于HashMap的扩容相对复杂,因为涉及到了多线程操作,这里扩容方法主要是transfer,我们来分析下这个方法的源码,研究下是如何扩容的。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {        int n = tab.length, stride;        //保证每个线程扩容最少是16,        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range        if (nextTab == null) {// initiatingtry {//扩容2倍    @SuppressWarnings("unchecked")    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];    nextTab = nt;} catch (Throwable ex) {      // try to cope with OOME    //出现异常情况就不扩容了。    sizeCtl = Integer.MAX_VALUE;    return;}//用新数组对象接收nextTable = nextTab;//初始化扩容下表为原数组的长度transferIndex = n;        }        int nextn = nextTab.length;        //扩容期间的过渡节点        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);        boolean advance = true;        boolean finishing = false; // to ensure sweep before committing nextTab    for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;while (advance) {    int nextIndex, nextBound;    //如果该线程已经完成了    if (--i >= bound || finishing)        advance = false;    //设置扩容转移下标,如果下标小于0,说明已经没有区间可以操作了,线程可以退出了    else if ((nextIndex = transferIndex) <= 0) {        i = -1;        advance = false;    }CAS操作设置区间    else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex,  nextBound = (nextIndex > stride ?   nextIndex - stride : 0))) {        bound = nextBound;        i = nextIndex - 1;        advance = false;    }}//如果计算的区间小于0了,说明区间分配已经完成,没有剩余区间分配了if (i < 0 || i >= n || i + n >= nextn) {    int sc;    if (finishing) {//如果扩容完成了,进行收尾工作        nextTable = null;//清空临时数组        table = nextTab;//赋值原数组        sizeCtl = (n << 1) - (n >>> 1);//重新赋值sizeCtl        return;    }//如果扩容还在进行,自己任务完成就进行sizeCtl-1,这里是因为,扩容是通过helpTransfer()和addCount()方法来调用的,在调用transfer()真正扩容之前,sizeCtl都会+1,所以这里每个线程完成后就进行-1。    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {    //这里应该是判断扩容是否结束        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;        //结束,赋值状态        finishing = advance = true;        i = n; // recheck before commit    }}//如果在table中没找到,就用过渡节点else if ((f = tabAt(tab, i)) == null)    //成功设置就进入下一个节点    advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)    //如果节点不为空,并且该位置的hash值为-1,表示已经处理了,直接进入下一个循环即可    advance = true; // already processedelse {//这里说明老table该位置不为null,也没有被处理过,进行真正的处理逻辑。进行同步锁    synchronized (f) {        if (tabAt(tab, i) == f) {Node<K,V> ln, hn;//如果hash值大于0if (fh >= 0) {//为运算结果    int runBit = fh & n;    Node<K,V> lastRun = f;    for (Node<K,V> p = f.next; p != null; p = p.next) {        int b = p.hash & n;        if (b != runBit) {runBit = b;lastRun = p;        }    }    if (runBit == 0) {        ln = lastRun;        hn = null;    }    else {        hn = lastRun;        ln = null;    }    for (Node<K,V> p = f; p != lastRun; p = p.next) {        int ph = p.hash; K pk = p.key; V pv = p.val;        //这里的逻辑和hashMap是一样的,都是采用2个链表进行处理,具体分析可以查看我分析HashMap的文章        if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);        elsehn = new Node<K,V>(ph, pk, pv, hn);    }    setTabAt(nextTab, i, ln);    setTabAt(nextTab, i + n, hn);    setTabAt(tab, i, fwd);    advance = true;}//如果是树节点,执行树节点的扩容数据转移else if (f instanceof TreeBin) {    TreeBin<K,V> t = (TreeBin<K,V>)f;    TreeNode<K,V> lo = null, loTail = null;    TreeNode<K,V> hi = null, hiTail = null;    int lc = 0, hc = 0;    for (Node<K,V> e = t.first; e != null; e = e.next) {        int h = e.hash;        TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);        //也是通过位运算判断两个链表的位置if ((h & n) == 0) {if ((p.prev = loTail) == null)    lo = p;else    loTail.next = p;loTail = p;++lc;        }        else {if ((p.prev = hiTail) == null)    hi = p;else    hiTail.next = p;hiTail = p;++hc;        }    }    //这里判断是否进行树转换    ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :        (hc != 0) ? new TreeBin<K,V>(lo) : t;    hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :        (lc != 0) ? new TreeBin<K,V>(hi) : t;   //这里把新处理的链表赋值到新数组中    setTabAt(nextTab, i, ln);    setTabAt(nextTab, i + n, hn);    setTabAt(tab, i, fwd);    advance = true;}        }    }}        }    }

ConcurrentHashMap的扩容还是比较复杂,复杂主要表现在,控制多线程扩容层面上,扩容的源码我没有解析的很细,一方面是确实比较复杂,本人有某些地方也不是太明白,另一方面是我觉得我们研究主要是弄懂其思想,能搞明白关键代码和关键思路即可,只要不是重新实现一套类似的功能,我想就不用纠结其全部细节了。总结下ConcurrentHashMap的扩容步骤如下:

1.获取线程扩容处理步长,最少是16,也就是单个线程处理扩容的节点个数。

2.新建一个原来容量2倍的数组,构造过渡节点,用于扩容期间的查询操作。

3.进行死循环进行转移节点,主要根据finishing变量判断是否扩容结束,在扩容期间通过给不同的线程设置不同的下表索引进行扩容操作,就是不同的线程,操作的数组分段不一样,同时利用synchronized同步锁锁住操作的节点,保证了线程安全。

4.真正进行节点在新数组的位置是和HashMap扩容逻辑一样的,通过位运算计算出新链表是否位于原位置或者位于原位置+扩容的长度位置,具体分析可以查看我的这篇文章。

三、总结

1.ConcurrentHashMap大部分的逻辑代码和HashMap是一样的,主要通过synchronized和来保证节点插入扩容的线程安全,这里肯定有同学会问,为啥使用synchronized呢?而不用采取乐观锁,或者lock呢?我个人觉得可能原因有2点:

a.乐观锁比较适用于竞争冲突比较少的场景,如果冲突比较多,那么就会导致不停的重试,这样反而性能更低。

b.synchronized在经历了优化之后,其实性能已经和lock没啥差异了,某些场景可能还比lock快。所以,我觉得这是采用synchronized来同步的原因。

2.ConcurrentHashMap的扩容核心逻辑主要是给不同的线程分配不同的数组下标,然后每个线程处理各自下表区间的节点。同时处理节点复用了hashMap的逻辑,通过位运行,可以知道节点扩容后的位置,要么在原位置,要么在原位置+oldlength位置,最后直接赋值即可。

以上就是concurrenthashmap为什么是线程安全的详细内容,更多请关注其它相关文章!


  • 上一条:
    cpu fclk频率如何设置
    下一条:
    怎样将u盘文件拷到另一u盘
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 2024.07.09日OpenAI将终止对中国等国家和地区API服务(0个评论)
    • 2024/6/9最新免费公益节点SSR/V2ray/Shadowrocket/Clash节点分享|科学上网|免费梯子(1个评论)
    • 国外服务器实现api.openai.com反代nginx配置(0个评论)
    • 2024/4/28最新免费公益节点SSR/V2ray/Shadowrocket/Clash节点分享|科学上网|免费梯子(1个评论)
    • 近期文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2017-07
    • 2017-08
    • 2017-09
    • 2018-01
    • 2018-07
    • 2018-08
    • 2018-09
    • 2018-12
    • 2019-01
    • 2019-02
    • 2019-03
    • 2019-04
    • 2019-05
    • 2019-06
    • 2019-07
    • 2019-08
    • 2019-09
    • 2019-10
    • 2019-11
    • 2019-12
    • 2020-01
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2020-07
    • 2020-08
    • 2020-09
    • 2020-10
    • 2020-11
    • 2021-04
    • 2021-05
    • 2021-06
    • 2021-07
    • 2021-08
    • 2021-09
    • 2021-10
    • 2021-12
    • 2022-01
    • 2022-02
    • 2022-03
    • 2022-04
    • 2022-05
    • 2022-06
    • 2022-07
    • 2022-08
    • 2022-09
    • 2022-10
    • 2022-11
    • 2022-12
    • 2023-01
    • 2023-02
    • 2023-03
    • 2023-04
    • 2023-05
    • 2023-06
    • 2023-07
    • 2023-08
    • 2023-09
    • 2023-10
    • 2023-12
    • 2024-02
    • 2024-04
    • 2024-05
    • 2024-06
    • 2025-02
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客