16boke - 一路博客

Java线程池及CountDownLatch的使用

Java多线程除了可以直接使用Thread和Runnable来实现外,jdk5以后提供了一种更方便的使用方式:线程池(Executors),本章暂不介绍什么是线程池,以及线程池的分类和使用。主要介绍线程池在实际工作的统计使用,并且介绍java并发包中同步锁的实现:CountDownLatch。

1、什么是CountDownLatch

直译过来就是倒计数(CountDown)门闩(Latch)。倒计数不用说,门闩的意思顾名思义就是阻止前进。在这里就是指 CountDownLatch.await() 方法在倒计数为0之前会阻塞当前线程。CountDownLatch 的作用和 Thread.join() 方法类似,可用于一组线程和另外一组线程的协作。例如,主线程在做一项工作之前需要一系列的准备工作,只有这些准备工作都完成,主线程才能继续它的工作。这些准备工作彼此独立,所以可以并发执行以提高速度。在这个场景下就可以使用 CountDownLatch 协调线程之间的调度了。在直接创建线程的年代(Java 5.0 之前),我们可以使用 Thread.join()。在 JUC 出现后,因为线程池中的线程不能直接被引用,所以就必须使用 CountDownLatch 了。CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。

2、实际案例

现在有一个需求,遍历生产环境中某一台机器上的8个redis的master实例,解析遍历出来的1W条记录,并取出dev和ver字段,将这两个字段拼接到url上,并请求第三方服务进行查询,判断另一个库中是否存在dev和ver对应的记录,并将结果进行返回。最终统计:总查询记录数(有可能redis中不到1W条),未查询的记录数。

在这个例子中就是一个典型的线程池及同步的使用案例,分8个线程分别进行统计,只有当每个线程执行完之后才进行最终的结果汇总。

3、具体代码

package com.ua;

import java.net.URLEncoder;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.lang.StringUtils;

import com.util.HttpUtil;

import net.sf.json.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanResult;

/**
 * ua统计
 */
public class UATest {
	Jedis jedis = new Jedis("192.168.1.5", 19001);
	int errorSum = 0;
	int sum = 0;
	Set<String> set = new HashSet<>();
	
	/**
	 * 最多只统计1W条记录
	 * @param jedis
	 */
	public void scan(Jedis jedis) {
		String key = "上海:aphone";
		String cursor = "";
		ScanResult<String> scan = null;
		List<String> list = null;
		while (!cursor.equals("0")) {
			scan = jedis.sscan(key, cursor);
			list = scan.getResult();
			for (String str : list) {
				if (sum >= 10000) {
					break;
				}
				getUid(str);
				sum++;
			}
			cursor = scan.getStringCursor();
		}
	}

	/**
	 * 解析统计出来的每条记录,获取dev和ver节点的值
	 * @param uid
	 */
	public void getUid(String uid) {
		String res = jedis.get(uid);
		if (!StringUtils.isEmpty(res)) {
			JSONObject jsonObject = JSONObject.fromObject(res);
			String dev = jsonObject.getString("dev");
			String ver = jsonObject.getString("ver");
			getUa(dev, ver);
		}
	}

	/**
	 * 请求第三方接口判断dev和ver的组合是否存在
	 * @param dev
	 * @param ver
	 */
	public void getUa(String dev, String ver) {
		String res;
		try {
			res = HttpUtil.getHttp("http://localhost:3000/getUa?test=true&dev=" + URLEncoder.encode(dev,"utf-8") + "&version=" + ver);
			if(res.indexOf("Error")>=0){
				System.out.println(Thread.currentThread().getName()+" : "+res);
				set.add(dev+"|"+ver);
				errorSum++;
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		//定义线程池大小为8,因为有8个redis的master实例
		ExecutorService pool = Executors.newFixedThreadPool(8);
		//结束的倒数锁,因为线程池有8个线程,所以也定义8个线程结束锁计数
		final CountDownLatch latch = new CountDownLatch(8);
		final UATest uaTest = new UATest();
		for (int i = 0; i < 8; i++) {
			final int tmp = i;
			Runnable runnable = new Runnable() {
				@Override
				public void run() {
					try{
						uaTest.scan(new Jedis("192.168.1.5", 6381 + tmp));
					}finally{
						//有一个线程进来就减1
						latch.countDown();
					}
				}
			};
			pool.submit(runnable);
		}
		//第一种阻塞统计方法:采用countlatch的递减方法,同时执行await()方法进行阻塞,每一个线程执行完就进行递减,直到为0才继续执行最终的统计输出代码
		try {
			//阻塞,直到latch为0才执行下面的输出语句
			latch.await();
			System.out.println("所有线程执行完毕!");
			System.out.println("用户总数:"+uaTest.sum+",未找到ua的个数:"+uaTest.errorSum);
			System.out.println(uaTest.set);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		pool.shutdown();
		/*
		 * 第二种统计方法:这种方法不是通过阻塞然后判断线程是否全部执行完来输出最终结果。而是通过调用线程池的isTerminated()方法来判断线程池是否已经完全结束,注意前提是需要调用线程池的shutdown()方法。
		 * 通过死循环的方式来判断整个线程池是否结束,如果结束就输出统计结果并结束死循环
		while(true){
			if(pool.isTerminated()){
				System.out.println("所有线程执行完毕!");
				System.out.println("用户总数:"+uaTest.sum+",未找到ua的个数:"+uaTest.errorSum);
				System.out.println(uaTest.set);
				break;
			}
		}*/
	}
}

4、代码解析

  • public void countDown()

递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少。如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程。

如果当前计数等于零,则不发生任何操作。

  • public boolean await(long timeout, TimeUnit unit) throws InterruptedException

使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回 true 值。

如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下三种情况之一前,该线程将一直处于休眠状态:

由于调用 countDown() 方法,计数到达零;或者

其他某个线程中断当前线程;或者

已超出指定的等待时间。

如果计数到达零,则该方法返回 true 值。

如果当前线程:

在进入此方法时已经设置了该线程的中断状态;或者

在等待时被中断,

则抛出 InterruptedException,并且清除当前线程的已中断状态。如果超出了指定的等待时间,则返回值为 false。如果该时间小于等于零,则此方法根本不会等待。

参数:

timeout - 要等待的最长时间

unit - timeout 参数的时间单位。

返回:

如果计数到达零,则返回 true;如果在计数到达零之前超过了等待时间,则返回 false

抛出:

InterruptedException - 如果当前线程在等待时被中断

  • pool.isTerminated()

判断线程池是否被关闭,并且没有线程正在执行。通过在while循环中调用这个方法可以实现判断线程池是否结束,然后可以进行统计

Java  J2EE