多线程(三)线程的并发工具类

多线程(三)线程的并发工具类

1. Fork/Join框架

1.什么是分而治之?

规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解

图片3

1.2 Fork/Join使用的标准范式

图片5

MyTask类需要继承RecursiveTask/RecursiveAction 其中RecursiveTask类的compute方法需要返回值 而RecursiveAction中的compute方法不需要返回值

1.2.1 Fork/Join的同步用法同时演示返回结果值:统计整型数组中所有元素的和。


生成指定长度随机数组的工具类

public class MakeArray {
    //数组长度
    public static final int ARRAY_LENGTH  = 1000;

    public static int[] makeArray() {

        //new一个随机数发生器
        Random r = new Random();
        int[] result = new int[ARRAY_LENGTH];
        for(int i=0;i<ARRAY_LENGTH;i++){
            //用随机数填充数组
            result[i] =  r.nextInt(ARRAY_LENGTH*3);
        }
        return result;

    }
}


public class SumArray {
    private static class SumTask extends RecursiveTask<Integer>{

        //定义阈值为数组长度的1/10
        private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;

        private int[] src;//需要进行计算的数组

        private int fromIndex;//开始计算的下标

        private int endIndex;//结束计算的下标

        public SumTask(int[] src, int fromIndex, int toIndex) {
            this.src = src;
            this.fromIndex = fromIndex;
            this.endIndex = toIndex;
        }

		@Override
		protected Integer compute() {
           //如果数组的长度小于阈值则计算
           if (endIndex - fromIndex < THRESHOLD){
               int count = 0;
               for(int i = fromIndex; i <= endIndex; i++){
                   SleepTools.ms(1);
                   count = count + src[i];
               }
               return count;
           }else{
               //如果数组的长度大于阈值则继续拆分数组 这里从中间将数组拆分为左右两个部分
               int mid = (fromIndex + endIndex) / 2;
               //将任务拆分为左右两个部分
               SumTask left = new SumTask(src, fromIndex, mid);
               SumTask right = new SumTask(src, mid + 1, endIndex);
               //执行所有的任务
               invokeAll(left, right);
               //将所有任务的结果聚合在一起
               return left.join() + right.join();
           }
		}
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        int[] src = MakeArray.makeArray();
        SumTask innerFind = new SumTask(src, 0, src.length - 1);
        long start = System.currentTimeMillis();
        forkJoinPool.invoke(innerFind);//同步调用
        System.out.println("Task is Running.....");
        System.out.println("The count is " + innerFind.join()
                +" spend time:"+ (System.currentTimeMillis() -start) + "ms");
    }
}

1.2.2 image.png

package pers.amos.concurrent.ch2.forkjoin;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 *@author Mark老师   享学课堂 https://enjoy.ke.qq.com 
 *
 *类说明:遍历指定目录(含子目录)找寻指定类型文件
 */
public class FindDirsFiles extends RecursiveAction{

    private File path;//当前任务需要搜寻的目录

    public FindDirsFiles(File path) {
        this.path = path;
    }

    public static void main(String [] args){
        try {
            // 用一个 ForkJoinPool 实例调度总任务
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("F:/"));

            pool.execute(task);//异步调用

            System.out.println("Task is Running......");
            Thread.sleep(1);
            int otherWork = 0;
            for(int i=0;i<100;i++){
                otherWork = otherWork+i;
            }
            System.out.println("Main Thread done sth......,otherWork="+otherWork);
            task.join();//阻塞的方法
            System.out.println("Task end");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

	@Override
	protected void compute() {

        File[] files = path.listFiles();
        List<FindDirsFiles> subTasks = new ArrayList<>();
        if (files != null){
            for (File file : files){
                //如果是目录则将它添加到继续遍历的任务列表
                if (file.isDirectory()){
                    subTasks.add(new FindDirsFiles(file));
                }else{
                    //不是目录 则打印出文件信息
                    if (file.getName().endsWith(".txt"))
                    System.out.println("文件-" + file.getAbsoluteFile());
                }
            }
            if(! subTasks.isEmpty()){
                //invokeAll()返回Collection<T>的子任务执行结果
                for (FindDirsFiles subTask : invokeAll(subTasks)){
                    //将子任务的结果组合在一起
                    subTask.join();//等待子任务执行完成
                }
            }
        }



    }
}

2.常用的线程并发工具类CountDownLatch

2.1 CountDownLatch简介

  • CountDownLatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。
  • 例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。
  • 常用的方法有以下几个
CountDownLatch(int count) //实例化一个倒计数器,count指定计数个数
countDown() // 计数减一
await() //等待,当计数减到0时,所有线程并行执行

2.2 CountDownLatch的原理

  • CountDownLatch 是通过一个计数器来实现的,计数器的初始值(一般)为线程的数量(初始值也可以大于线程的数量)。
    • 每当一个线程完成了自己的任务后,计数器的值就会减 1。
    • 当计数器值到达 0 时,表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。 image.png 构造函数:
//Constructs a CountDownLatch initialized with the given count.
public CountDownLatch(int count) {...}
  • 这个计数(count)本质上是闭锁需要等待的线程数。 此值只能设置一次,并且 CountDownLatch 不提供任何其他机制来重置此计数。
  • 第一次与 CountDownLatch 的交互是与等待其他线程的主线程进行的。
    • 此主线程必须在启动其他线程后立即调用 CountDownLatch.await() 方法,这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务为止。
  • 其他 N 个线程必须引用闭锁对象,因为它们如果完成了任务需要通知 CountDownLatch 对象。
    • 此通知是通过 CountDownLatch.countDown() 方法完成的,每次调用计数减少 1。
    • 当所有 N 个线程都调用了这个方法时,计数将达到 0,主线程可以在 await() 方法之后继续执行。

代码示例: 业务线程和主线程必须等待初始化线程完成后才能够进行工作。


public class UseCountDownLatch {

	//初始化 6个扣分点
	private static CountDownLatch latch = new CountDownLatch(6);

	//初始化线程
	private static class InitThread implements Runnable {
        @Override
        public void run() {
            System.out.println("初始化线程 [thread_" + Thread.currentThread().getId() + "]准备工作");
            //计数器减1
            latch.countDown();
            System.out.println("初始化线程[thread_" + Thread.currentThread().getId() + "]继续工作");
        }
    }

    //业务线程
    private static class BusiThread implements Runnable {
        @Override
        public void run() {
            try {
                //等待其他所有初始化线程完成工作
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("业务线程[thread_" + Thread.currentThread().getId() + "]开始工作");
        }
    }

    //5个初始化线程 6个扣除点
    public static void main(String[] args) throws InterruptedException {
        //单独的初始化线程,初始化分为2步,需要扣减两次 这说明CountDownLatch初始值可以大于线程数 
        new Thread(new Runnable() {
            @Override
            public void run() {
                SleepTools.ms(1);
                System.out.println("单独创建的线程[thread_" + Thread.currentThread().getId() + "]开始第一步工作");
                latch.countDown();
                SleepTools.ms(1);
                System.out.println("单独创建的线程[thread_" + Thread.currentThread().getId() + "]开始第二步工作");
                latch.countDown();
            }
        }).start();

    	//创建1个业务线程
        new Thread(new BusiThread()).start();

        //创建4个初始化线程 共扣除4次
        for(int i = 0; i <= 3; i++){
            Thread thread = new Thread(new InitThread());
            thread.start();
        }
        //主线程与业务线程一起等待其他线程执行
        latch.await();
        System.out.println("主线程开始工作..");
    }
}

执行结果

初始化线程先完成工作 然后主线程和任务线程才能够运行

初始化线程 [thread_13]准备工作
初始化线程[thread_13]继续工作
初始化线程 [thread_14]准备工作
初始化线程[thread_14]继续工作
初始化线程 [thread_15]准备工作
初始化线程[thread_15]继续工作
初始化线程 [thread_16]准备工作
初始化线程[thread_16]继续工作
单独创建的线程[thread_11]开始第一步工作
单独创建的线程[thread_11]开始第二步工作
主线程开始工作..
业务线程[thread_12]开始工作

应用场景:CountDownLatch 类主要使用的场景有明显的顺序要求。 比如只有等跑完步才能计算排名,只有等所有记录都写入才能进行统计工作等等,因此 CountDownLatch 完善的是某种逻辑上的功能,使得线程按照正确的逻辑进行。

3.CyclicBarrier

3.1 简介

栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

3.2 构造方法

image.png 通过类图我们可以看到,CyclicBarrier内部使用了ReentrantLock和Condition两个类。它有两个构造函数:

public CyclicBarrier(int parties) {
    this(parties, null);
}
 
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier的另一个构造函数CyclicBarrier(int parties, Runnable barrierAction),用于线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

3.3 await()方法

调用await方法的线程告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。直到parties个参与线程调用了await方法,CyclicBarrier同样提供带超时时间的await和不带超时时间的await方法。

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

代码示例

public class CyclicBarrierTest {
	// 自定义工作线程
	private static class Worker extends Thread {
		private CyclicBarrier cyclicBarrier;
		
		public Worker(CyclicBarrier cyclicBarrier) {
			this.cyclicBarrier = cyclicBarrier;
		}
		
		@Override
		public void run() {
			super.run();
			
			try {
				System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
				cyclicBarrier.await();
				System.out.println(Thread.currentThread().getName() + "开始执行");
				// 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
				Thread.sleep(1000);
				System.out.println(Thread.currentThread().getName() + "执行完毕");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
 
	public static void main(String[] args) {
		int threadCount = 3;
		CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
		
		for (int i = 0; i < threadCount; i++) {
			System.out.println("创建工作线程" + i);
			Worker worker = new Worker(cyclicBarrier);
			worker.start();
		}
	}
}

运行结果

创建工作线程0
创建工作线程1
Thread-0开始等待其他线程
创建工作线程2
Thread-1开始等待其他线程
Thread-2开始等待其他线程
Thread-2开始执行
Thread-0开始执行
Thread-1开始执行
Thread-1执行完毕
Thread-0执行完毕
Thread-2执行完毕

3.4 CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;

  • CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;

  • CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。

参考博客:【Java 并发笔记】CountDownLatch 相关整理
                       Java并发编程之CyclicBarrier详解

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×