侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

Java 并发包之线程池和原子计数

Java  /  管理员 发布于 5年前   616

对于大数据量关联的业务处理逻辑,比较直接的想法就是用JDK提供的并发包去解决多线程情况下的业务数据处理。线程池可以提供很好的管理线程的方式,并且可以提高线程利用率,并发包中的原子计数在多线程的情况下可以让我们避免去写一些同步代码。

    这里就先把jdk并发包中的线程池处理器ThreadPoolExecutor 以原子计数类AomicInteger 和倒数计时锁CountDownLatch的一些常用api介绍下。

  一、ThreadPoolExecutor

JDK建议我们用工厂方法来创建线程池。Executors有四种工厂方法

1、Executors.newCachedThreadPool

2、Executors.newFixedThreadPool

3、Executors.newSingleThreadExecutor

4、Executors.newScheduledThreadPool()

 

     当然也可以自己订制线程池来管理线程。下面简单举个订制线程池的构造器例子:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(ThreadPoolUtil.CORE_POOL_SIZE, ThreadPoolUtil.MAX_POOL_SIZE, ThreadPoolUtil.KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(ThreadPoolUtil.WORK_QUEUE_SIZE), new ThreadPoolExecutor.CallerRunsPolicy());

 

    这里结合execute的实现来简单解释一下参数的含义。

1、corePoolSize:核心线程的数目。也就是线程池维护的最小线程数目。如果Idle time超过了keepAliveTime,线程实例就会被终止,移出线程池。对于核心线程数目的大小需要根据不同的应用场景和CPU的情况来具体确定,以保证最佳的线程数。

2、maximumPoolSize:线程池维护的最大线程数目。对于无界队列,这个是形同虚设的。在有界队列中可以通过该项设置特殊的handler来处理线程。

3、keepAliveTime 以及unit是用来控制idleTime的。即线程池维护线程所允许的空闲时间以及时间单位。

4、workQueue任务队列:有多种队列可供选择,SynchronousQueue:直接提交类型的队列。不保持线程而是直接创建。LinkedBlockingQueue:无界队列,维持线程数目在corePoolSize之内不会创建新的线程,而是放到队列中,适合任务独立的线程类型。ArrayBlockingQueue:有界队列。有助于防止资源耗尽但是需要调整好合适的队列size以及最大线程数目。

5、policy:处理策略。也就是当任务数超过maximumPoolSize时的处理策略。可以自己定义单独的handler也可以用线程池提供的。

AbortPolicy:抛出java.util.concurrent.RejectedExecutionException异常

CallerRunsPolicy:重试添加当前的任务,他会自动重复调用execute()方法

DiscardOldestPolicy:抛弃旧的任务

DiscardPolicy:抛弃当前的任务

 

     通过以上几点可以看一下execute方法的内部。其处理的顺序基本上是corePoolSize会先填满,然后任务会放到队列中,有空闲的线程就会将队列中的任务“offer”出来执行,当队列也满了的时候就会交给handler来处理任务。

 

public void execute(Runnable command) {//任务为空的时候抛出空        if (command == null)            throw new NullPointerException();        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//添加线程到线程池中运行或者如果size>core放到队列中            if (runState == RUNNING && workQueue.offer(command)) {                if (runState != RUNNING || poolSize == 0)                    ensureQueuedTaskHandled(command);            }            else if (!addIfUnderMaximumPoolSize(command))//对于超出maxmum的线程,调用对应的reject策略                reject(command); // is shutdown or saturated        }    }

二、 AomicInteger计数器

    在多线程的情况下写一个计数器用同步的方式比较麻烦。并发包提供的AomicInteger将程序员从写一些同步代码中解脱出来。AomicInteger中最重要的方法就是incrementAndGet()。看一下源码:

 

  /**     * Atomically increments by one the current value.     *     * @return the updated value     */    public final int incrementAndGet() {        for (;;) {            int current = get();            int next = current + 1;            if (compareAndSet(current, next))                return next;        }    }

      其中compareAndSet方法是保证不会重复计数的关键。其中调用了CAS原语的操作,使得效率和准确性得以保证,但是起上一步在读next的时候如果另一个线程修改了当前值还是会出现问题,但这种问题的几率不高,而且也比通过synchronized和notify要更简便直接。每次调用incrementAndGet就会使AomicInteger的值加一返回,调用起来也很方便。

  三、  CountDownLatch

可以配合AomicInteger一起进行倒数计时。其控制了指定数目线程全部执行完相应的逻辑后进行减数计算,知道数值为0之后唤醒后续线程继续执行。在这次应用中控制所有的AomicInteger计数结束后,也即所有子线程执行结束之后,再运行父线程来返回结果。CountDownLatch 中常用的两个方法就是await()和countDown()。顾名思义,await方法会阻塞线程往后执行,而去继续执行子线程。countDown()也是原语操作。每次调用都会将latch的数值减一,其初始数值可以在构造其中指定,一般是需执行线程的size。在调用的时候为了确保其能countDown,建议将其放到finally代码块中。保证计数与否都要countDown,要不父线程就永远被阻塞在那里了。countDown调用的内部静态类Sync继承的AbstractQueuedSynchronizer的releaseShared()方法。在tryReleaseShared(1)返回true的时候会进一步执行 doReleaseShared()方法。看一下其内部处理:

 

private void doReleaseShared() {        for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue; // loop to recheck cases                    unparkSuccessor(h);                }                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue; // loop on failed CAS            }            if (h == head) // loop if head changed                break;        }    }

       Node是用来标记等待状态。如果是head就保证等待状态不被取消。如果是tail表示是等待队列的尾元素。通过CAS原语操作来判断其在操作系统中的waitStatus。



  • 上一条:
    Java实现的基于模板的网页结构化信息精准抽取组件:HtmlExtractor
    下一条:
    java Illegal overloaded getter method with ambiguous type for propert的解决
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在java中实现的脱敏工具类代码示例分享(0个评论)
    • zookeeper安装流程步骤(0个评论)
    • 在java中你背的“八股文”可能已经过时了(2个评论)
    • 在php8.0+版本中使用属性来增加值代码示例(3个评论)
    • java 正则表达式基础,实例学习资料收集大全 原创(0个评论)
    • 近期文章
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • PHP 8.4 Alpha 1现已发布!(0个评论)
    • Laravel 11.15版本发布 - Eloquent Builder中添加的泛型(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-11
    • 2018-03
    • 2020-03
    • 2023-05
    • 2023-11
    • 2024-01
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客