news 2026/4/17 9:01:45

Java多线程(十)ForkJoinPool

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Java多线程(十)ForkJoinPool

简介

Java7引入的线程池实现,适合于计算可以被递归执行的任务,并且这些任务都是是计算密集型的,不会有IO阻塞。

ForkJoinPool中有两个关键点:

  • 分治法:将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立、并且与原问题性质相同,求出子问题的解后,将这些解合并,就可以得出原问题的解,例如二分法。
  • 工作窃取:当某个线程的任务队列中没有任务时,从其他线程的任务队列中获取任务来执行,以充分利用工作线程的窃取能力,减少由于线程获取不到任务而造成的空闲浪费。在工作窃取机制中,每个线程都有自己的双端队列存储任务,线程从自身队列的尾部获取任务,此时是后进先出,如果当前线程空闲,会从其他线程的任务队列的头部获取任务,此时是先进先出,获取任务均是通过CAS操作实现的。

ForkJoinPool基于工作窃取算法,能高效利用多核处理器,提升并发性能。

入门案例

案例1:求start到end之间的和

需求:

/** * 求x到y之间的和 */publicclassSumTaskextendsRecursiveTask<BigInteger>{privatestaticfinalLongTHRESHOLD=102400000L;privatefinalIntegerstart;privatefinalIntegerend;publicSumTask(Integerstart,Integerend){this.start=start;this.end=end;}@OverrideprotectedBigIntegercompute(){if(end-start<THRESHOLD){BigIntegerresult=BigInteger.ZERO;longlen=end-start+1;longs=start;for(longi=0;i<len;i++){result=result.add(BigInteger.valueOf(s));s++;}returnresult;}else{intmid=start+(end-start)/2;SumTaskt1=newSumTask(start,mid);SumTaskt2=newSumTask(mid+1,end);invokeAll(t1,t2);BigIntegerjoin=t1.join();BigIntegerjoin1=t2.join();returnjoin.add(join1);}}}publicclassSumTaskTest{publicstaticvoidmain(String[]args){longstartTime=System.currentTimeMillis();ForkJoinPoolcommonPool=ForkJoinPool.commonPool();BigIntegerresult=commonPool.invoke(newSumTask(1,Integer.MAX_VALUE));longexecuteTime=System.currentTimeMillis()-startTime;System.out.println("结果 = "+result.toString()+" 耗时 = "+executeTime+"ms");}}
案例2:斐波那契数列

斐波那契数列,第一项是0还是1? 第一项为0和第一项为1,都符合定义,都可以正确计算。最初斐波那契在《计算之书》(1202年)中以兔子繁殖为例提出数列时,起始项为 1、1。 随着数学体系的发展,第一项为0被引入进来。通常如果没有特殊说明,第一项通常是0

importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.RecursiveTask;publicclassFibonacciTaskextendsRecursiveTask<Long>{privatestaticfinalIntegerTHRESHOLD=10;privatefinallongn;publicFibonacciTask(Longn){if(n==null){thrownewIllegalArgumentException("参数 n 不可为null");}if(n<0){thrownewIllegalArgumentException("参数 n 不可小于0");}this.n=n;}@OverrideprotectedLongcompute(){if(n<=THRESHOLD){returncomputeSequentially();}List<FibonacciTask>list=newArrayList<>();list.add(newFibonacciTask(n-1));list.add(newFibonacciTask(n-2));invokeAll(list);returnlist.get(0).join()+list.get(1).join();}/** * 计算斐波那契数列 * @return 成员变量n对应的数列值 */privateLongcomputeSequentially(){if(n<=1){returnn;}longa=0,b=1;for(longi=2;i<n+1;i++){longtmp=b;b=a+b;a=tmp;}returnb;}}importjava.util.concurrent.ForkJoinPool;publicclassFibonacciTaskTest{publicstaticvoidmain(String[]args){ForkJoinPoolcommonPool=ForkJoinPool.commonPool();longstartTime=System.currentTimeMillis();FibonacciTaskfibonacciTask=newFibonacciTask(100L);Longresult=commonPool.invoke(fibonacciTask);longexecuteTime=System.currentTimeMillis()-startTime;System.out.println(executeTime+"ms, result = "+result);}}
案例3:无返回值的异步任务,归并排序
publicclassMergeSortTask2<TextendsComparable<T>>extendsRecursiveAction{privatestaticfinalIntegerTHRESHOLD=100000;privatefinalT[]arr;privatefinalintstart;privatefinalintend;publicMergeSortTask2(T[]arr){this.arr=arr;start=0;end=arr.length-1;}privateMergeSortTask2(T[]arr,intstart,intend){this.arr=arr;this.start=start;this.end=end;}@Overrideprotectedvoidcompute(){if(end-start<=THRESHOLD){mergeSort(arr,start,end);}else{intmid=calMid(start,end);MergeSortTask2<T>task1=newMergeSortTask2<>(arr,start,mid);MergeSortTask2<T>task2=newMergeSortTask2<>(arr,mid+1,end);// 先提交task1到线程池,再在当前线程计算task2,然后再阻塞地等待task1的结果task1.fork();task2.compute();task1.join();merge(arr,start,mid,end);}}/** * 计算start和end的中间值 */privateintcalMid(intstart,intend){returnstart+(end-start)/2;}// 归并排序privatevoidmergeSort(T[]arr,intleft,intright){if(left==right){return;}intmid=calMid(left,right);mergeSort(arr,left,mid);mergeSort(arr,mid+1,right);merge(arr,left,mid,right);}@SuppressWarnings("unchecked")privatevoidmerge(T[]arr,intstart,intmid,intend){intlen=end-start+1;T[]tmpArr=(T[])newComparable[len];inti=start;intj=mid+1;intk=0;while(i<=mid&&j<=end){if(arr[i].compareTo(arr[j])<=0){tmpArr[k++]=arr[i++];}else{tmpArr[k++]=arr[j++];}}while(i<=mid){tmpArr[k++]=arr[i++];}while(j<=end){tmpArr[k++]=arr[j++];}System.arraycopy(tmpArr,0,arr,start,len);}}

测试:

@Testpublicvoidtest3(){Integer[]arr=buildIntegerArr(n);// 100000000longstartTime=System.currentTimeMillis();MergeSortTask2<Integer>integerMergeSortTask=newMergeSortTask2<>(arr);integerMergeSortTask.invoke();// 1亿大小的数组,使用ForkJoinPool排序,花费 98438ms// System.out.println("arr = " + Arrays.toString(arr));System.out.println("result = "+(System.currentTimeMillis()-startTime)+"ms");}/** * 构建待排序的数组 */privatestaticInteger[]buildIntegerArr(intlimit){longstartTime=System.currentTimeMillis();Integer[]arr=newInteger[limit];Randomrandom=newRandom();for(inti=0;i<limit;i++){arr[i]=random.nextInt(limit*10);}System.out.println("构建测试"+limit+"大小的数组,花费 "+(System.currentTimeMillis()-startTime)+"ms");returnarr;}
案例4:ForkJoinPool 状态监控
/** * 打印ForkJoinPool的状态 */publicstaticvoidprintCommonPoolStatus(ForkJoinPoolcommonPool){if(commonPool==null){return;}System.out.println("----------------------");System.out.println("并行度 = "+commonPool.getParallelism()+"\n"+"工作线程数 = "+commonPool.getPoolSize()+"\n"+"活跃的线程数 = "+commonPool.getActiveThreadCount()+"\n"+"运行中的线程数 = "+commonPool.getRunningThreadCount()+"\n"+"排队提交的数量 = "+commonPool.getQueuedSubmissionCount()+"\n"+"排队任务的数量 = "+commonPool.getQueuedTaskCount()+"\n"+"当前所有线程是否空闲中 = "+commonPool.isQuiescent()+"\n"+"偷窃任务数 = "+commonPool.getStealCount()+"\n"+"是否异步模式 = "+commonPool.getAsyncMode()+"\n"+"是否还有未执行的任务 = "+commonPool.hasQueuedSubmissions()+"\n"+"线程工厂 = "+commonPool.getFactory()+"\n"+"异常处理器 = "+commonPool.getUncaughtExceptionHandler());System.out.println("======================");}

提交任务后,另起一个线程,循环调用这个方法,打印线程池的状态

基本使用

ForkJoinPool,自带一个公共线程池,线程数是CPU核数减1,如果用户向ForkJoinPool中提交任务时,没有指定线程池,就是案例中的那种写法,就使用默认线程池。

向ForkJoinPool中提交任务时指定线程池:

ForkJoinPoolforkJoinPool=newForkJoinPool(4);forkJoinPool.execute(integerMergeSortTask);

ForkJoinPool中执行的任务,需要继承RecursiveTask或RecursiveAction,一个是有返回值的任务,一个是无返回值的任务,任务本身负责自身的拆分,直到拆分到一定阈值再执行。这里就是把单线程执行的计算任务拆分为多线程可以执行的,最后再合并它的结果。同时基于工作窃取机制,确保执行任务时所有线程都不会空闲,除非数据倾斜导致某些任务的计算量比较大,否则所有线程都可以保证一个很好的效率,好的一点是如何拆分数据取决于用户,他可以在拆分时避免数据倾斜。

向线程池中提交task,或者调用task本身的invoke方法,都可以把任务提交到线程池。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 5:23:06

MonitorControl终极指南:轻松掌控Mac外接显示器设置

MonitorControl终极指南&#xff1a;轻松掌控Mac外接显示器设置 【免费下载链接】MonitorControl MonitorControl/MonitorControl: MonitorControl 是一款开源的Mac应用程序&#xff0c;允许用户直接控制外部显示器的亮度、对比度和其他设置&#xff0c;而无需依赖原厂提供的软…

作者头像 李华
网站建设 2026/4/13 20:17:24

React Doc Viewer 终极指南:如何在React应用中轻松实现文件预览

React Doc Viewer 终极指南&#xff1a;如何在React应用中轻松实现文件预览 【免费下载链接】react-doc-viewer File viewer for React. 项目地址: https://gitcode.com/gh_mirrors/re/react-doc-viewer React Doc Viewer 是一个强大的React文档查看器组件&#xff0c;让…

作者头像 李华
网站建设 2026/4/18 7:55:40

D2RML:暗黑破坏神2重制版多开启动器完全指南

D2RML&#xff1a;暗黑破坏神2重制版多开启动器完全指南 【免费下载链接】D2RML Diablo 2 Resurrected Multilauncher 项目地址: https://gitcode.com/gh_mirrors/d2/D2RML D2RML是一款专为暗黑破坏神2重制版设计的智能多开启动器&#xff0c;通过先进的令牌管理系统彻底…

作者头像 李华
网站建设 2026/4/18 3:03:32

JSXBin解码逆向解析实战指南:三步搞定二进制文件转换

JSXBin解码逆向解析实战指南&#xff1a;三步搞定二进制文件转换 【免费下载链接】jsxbin-to-jsx-converter JSXBin to JSX Converter written in C# 项目地址: https://gitcode.com/gh_mirrors/js/jsxbin-to-jsx-converter 你是否曾经面对一堆JSXBin二进制文件束手无策…

作者头像 李华
网站建设 2026/4/18 5:22:08

终极窗口管理神器:PersistentWindows让多屏办公效率翻倍

终极窗口管理神器&#xff1a;PersistentWindows让多屏办公效率翻倍 【免费下载链接】PersistentWindows fork of http://www.ninjacrab.com/persistent-windows/ with windows 10 update 项目地址: https://gitcode.com/gh_mirrors/pe/PersistentWindows 还在为每次插拔…

作者头像 李华
网站建设 2026/4/17 12:44:15

Paperless-ngx多语言本地化终极指南:从配置到实战完整教程

Paperless-ngx多语言本地化终极指南&#xff1a;从配置到实战完整教程 【免费下载链接】paperless-ngx A community-supported supercharged version of paperless: scan, index and archive all your physical documents 项目地址: https://gitcode.com/GitHub_Trending/pa/…

作者头像 李华