项目中用到夜间跑批的问题,考虑使用多线程同时跑批,每次拿出100批数据,创建定长线程池,长度
为10,然后将这100批数据分成10份,10个线程各自处理自己的那一部分,每当有一个线程处理完成后就会
进入等待,计数器减1,直到计数器为0时,说明每个线程都完成了自己的工作,然后进入主线程执行。
1:创建定长线程池
1 public static final int THREAD_NUM = 10;2 public ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_NUM);// 创建10个线程放入线程池内
2:创建线程任务
1 public class myRun implements Runnable{ 2 3 private List
3:主线程执行任务
public void doHandler() { try { List> list = getList(); final CountDownLatch countdown = new CountDownLatch(THREAD_NUM);// 子线程数 int oneThreadCount = list.size()/THREAD_NUM + 1; for (int num = 0; num < THREAD_NUM; num++) { // 开启线程个数 System.out.println("----------------共有的线程个数:" + countdown.getCount()); threadPool.execute(new myRun(list,num,countdown,oneThreadCount)); } countdown.await();// 这里进行同步等待,等所有子线程结束后,执行 countdown.await()后面的代码 System.out.println("##结束等待------------------------"); } catch (InterruptedException e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } public static List > getList() { List > resultList = new ArrayList >(); for (int i = 0; i < 99; i++) { Map tmpMap = new HashMap (); tmpMap.put("id", 1); tmpMap.put("name", "rose"); tmpMap.put("salary", 10000); resultList.add(tmpMap); } return resultList; }
4:整个代码如下
1 package com.hlcui.threadpool; 2 3 import java.util.ArrayList; 4 import java.util.HashMap; 5 import java.util.List; 6 import java.util.Map; 7 import java.util.concurrent.CountDownLatch; 8 import java.util.concurrent.ExecutorService; 9 import java.util.concurrent.Executors;10 11 public class ThreadCountDemo {12 public static final int THREAD_NUM = 10;13 public ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_NUM);// 创建10个线程放入线程池内14 15 public void doHandler() {16 try {17 List> list = getList();18 final CountDownLatch countdown = new CountDownLatch(THREAD_NUM);// 子线程数19 int oneThreadCount = list.size()/THREAD_NUM + 1;20 for (int num = 0; num < THREAD_NUM; num++) { // 开启线程个数21 System.out.println("----------------共有的线程个数:" + countdown.getCount());22 threadPool.execute(new myRun(list,num,countdown,oneThreadCount));23 }24 countdown.await();// 这里进行同步等待,等所有子线程结束后,执行 countdown.await()后面的代码25 System.out.println("##结束等待------------------------");26 } catch (InterruptedException e) {27 e.printStackTrace();28 } finally {29 threadPool.shutdown();30 }31 }32 33 public static List > getList() {34 List > resultList = new ArrayList >();35 for (int i = 0; i < 99; i++) {36 Map tmpMap = new HashMap ();37 tmpMap.put("id", 1);38 tmpMap.put("name", "rose");39 tmpMap.put("salary", 10000);40 resultList.add(tmpMap);41 }42 return resultList;43 }44 45 public class myRun implements Runnable{46 47 private List > tmpList;48 private int threadCount;49 private CountDownLatch countdown;50 private int oneThreadCount;51 52 public myRun(List > tmpList, int threadCount,53 CountDownLatch countdown,int oneThreadCount) {54 super();55 this.tmpList = tmpList;56 this.threadCount = threadCount;57 this.countdown = countdown;58 this.oneThreadCount = oneThreadCount;59 }60 61 @Override62 public void run() {63 for (int i = threadCount * oneThreadCount; i < (threadCount + 1) * oneThreadCount64 && i < tmpList.size(); i++) {65 System.out.println(Thread.currentThread().getName()+ "#" + i);66 System.out.println(tmpList.get(i));67 }68 countdown.countDown();// 每个子线程结束后进行线程减169 System.out.println(Thread.currentThread().getName()70 + "##线程结束------------------------");71 System.out.println("----------------剩余的线程个数:"72 + countdown.getCount());73 }74 75 }76 }
代码均已经验证。