Java并发编程实战笔记(4)-基础构建模块

Posted by zhidaliao on February 3, 2016

主要介绍Java类库中并发基础构建模块 & 常用模式

同步容器类

这些容器类的关键在于:将他们的状态封装起来,并对每个公有方法进行同步,使得每次只有一个线程可以访问容器的状态

存在的问题

复合操作需要同步:包括 迭代、跳转、条件运算,虽然每个接口只能一个线程操作,但是存在线程交替操作的情况,所以不能保证没有其他线程修改状态

解决:正确的客户端加锁

迭代器 与 ConcurrentModificationException

在迭代器场景中,如果有状态被修改将会抛出 ConcurrentModificationException 并发修改异常

原理:将计数器的变化与容器关联起来,如果容器在迭代,计数器被修改,那么hasNext和next操作会抛出异常。缺点:对检查并没有同步 ; 原因:对性能的权衡妥协

方案:客户端加锁,等待时间过长。使用克隆容器的方式,在克隆的时候要加锁, 克隆会造成性能开销,需要从调用频率和响应时间、容器大小等考虑

隐藏的迭代器:

如果状态与保护她的同步代码之间相隔越远,开发者就越容易忘记在访问状态时使用同步

避免下述这种情况的方式: 使用 synchronizedSet来替代并且对同步代码进行封装

Set<Integer> demoSet = new HashSet();

public void doSome(){
	system.out.println("value is "+ demoSet);
}

hasCode / equals / containsAll / retainAll / removeAll 等方法,以及把容器作为构造器参数的场景, 都会对容器进行迭代。都有可能抛出并发修改异常。

并发容器类

设计的原因:提高伸缩性并降低风险

  • 改进同步容器类的性能: 同步容器类将所有对容器状态的访问都串行化, 以实现线程安全,在大并发的情况下吞吐量将严重降低。
  • 针对多个并发访问增加了一些特性: 例如条件运算的复合操作, 不会产生并发修改异常的迭代器
ConcurrentHashMap

同步容器类在执行每一个操作都会持有一个锁,而像迭代这样的操作会耗费很长的时间,导致其他线程挂起。

  • 并发容器类在锁的粒度方面做了改进。这种机制称为 分段锁(Lock Striping),后面章节会介绍。
  • 迭代器操作具有弱一致性,可以容忍并发修改。在迭代器被创建后,出现修改的操作会反映给容器。(但是不保证)
  • ConcurrentHashMap 相对于同步Map有更多的优势和更少的劣势,只有当需要对Map进行独占操作时,才放弃对ConcurrentHashMap的使用。
CopyOnWriteArrayList

迭代期间不需要加锁或复制,不会抛出并发修改异常

  • 线程安全性在于:只要正确的发布一个事实不可变的CopyOnWriteArrayList对象,那么访问该对象时就不再需要进一步的同步
  • 每次修改时都会创建并重新发布一个新的容器副本,结束之后再将原容器的引用指向新容器。
  • CopyOnWriteArrayList容器保留一个指向底层基础数组的引用,这个数组位于迭代器的起始位置,不会被修改(不支持可变 remove等操作),对其进行同步时只需要确保数组内容的可见性
  • 开销:每次修改容器都会复制底层数组,当容器规模较大的时候需要较大开销,适用于 迭代操作远多于修改操作,比如 servlet过滤器/监听器注册表
阻塞队列 和 消费者-生产者模式
  • 将复杂操作分解成两个组建,简化业务逻辑
  • 常见场景:线程池、工作队列、Executor任务执行框架
  • 当队列为空,take操作会阻塞,特别适合 服务器接收请求的场景
  • 阻塞队列提供了 offer方法,当数据不能被添加到队列中,那么将返回一个失败状态,能够灵活的自定义处理负载的策略;
  • 设置工作队列的时候尽量使用有界队列,防止架构更改
  • 特殊的“队列”,并不会维护空间,直接交付给消费者,没有存储和FIFO操作
双端队列 与 工作密取(Work stealing)

双端队列: 在队列头和队列尾高效插入和移除。包含 DequeBlockingDeque两种类型

工作密取模式:

  • 为了高效:每个消费者都有自己的双端队列,当自己的队列任务完成之后,会从其他消费者的 双端队列的尾部(减少竞争关系)获取任务消费
  • 既是消费者又是生产者的模式: 比如网络爬虫,在解析到网页内容的时候发现了其他超链接,将链接加到队列的尾部;其他场景:搜索图算法。

阻塞方法 和 中断方法

  • 线程会因为一些原因阻塞,比如 IO操作、等待获得一个锁、从Thread.sleep中醒来等。
  • 阻塞状态包括(BLOCKED / WAITING / TIMED_WAITING) , 运行状态(RUNNABLE)

InterruptedException:

  • 受检查异常。表示一个阻塞方法被中断,它将努力提前结束阻塞状态
  • Interrupt方法,Thread中特有的: 用于中断线程 或者 查询线程是否中断
  • 中断是一种协作机制,一个A线程不能强制另一个B线程马上中断,只能要求B线程在可以中断的地方停止它的操作(前提是B愿意停下来)。

当代码中调用了一个可能 抛出InterruptedException的方法时,你自己的代码也变成了阻塞方法,并且必须要处理对中断的响应。响应的方式有两种:

  • 传递InterruptedException,避开异常,直接抛出给方法的调用者处理,或者捕获异常作简单的处理再抛出异常。
  • 保持中断状态(恢复中断):有时不能直接抛出异常,比如当代码是 Runnable的一部分时。处理方式: 捕获异常,保持线程的中断状态,在调用栈中的更高层代码将看到该线程引发了中断
    class Demo implements Runnalbel{
      public void run(){
          try{
              queue.take();
          }catch{
              Thread.currentThread().interrupt();
          }
      }
    }
    

同步工具类

  • 只要对象可以根据自身的状态来协调线程的控制流,任何对象都可以是同步工具类
  • 工具类包含一些特定的结构化属性:1、封装一些状态。这些状态将决定线程的执行阶段; 2、提供方法对状态进行操作; 3、一些方法用于高效的等待同步工具类进入到预期状态?
  • 常见的工具类包括: 队列 / 信号量(semaphore) / 栅栏(Barrier) / 闭锁(Latch)
闭锁(Latch)

可以延迟线程的进度(阻塞进程,等待时机释放),直到闭锁到达终止状态,闭锁到达终止状态将不再更新状态; 可以用来确保某些活动在其他活动完成之后才执行。常见场景:

  • 王者荣耀中兵线的初始化线程A,需要所有玩家加载游戏进度完成的线程都结束后,才会开始A
  • 服务在其依赖的所有服务启动完成之后再启动

CountDownLatch是一种闭锁实现,包括一个计数器,需要等待的时间数量,countDown方法减少计数值,await方法一直阻塞直到计数器为0,或者事件中断或超时

void test(){
	final CountDownLatch cd = new CountDownLatch(1);

	for(int i = 0 ;i < 10; i++){
		Thread thread = new Thread(){
			public void run(){
				cd.await();
				...
			}
		}
	}

	cd.countDown();
}
闭锁(FutureTask)
  • FutureTask实现了Future的语义,表示一种可生成结果的计算(通过Callable类实现);
  • FutureTask相当于一种可生成结果的 Runnable,可以处于三种状态, 等待运行 、 正在运行 、 运行完成;运行完成包括三种:正常结束、取消而结束、异常结束
  • Future.get方法:如果任务完成马上返回结果,否则一直阻塞直到完成或者异常结束。结果从执行线程返回给获取结果的这个线程
  • FutureTask在Executor框架中表示异步任务; 此外还可以用来表示时间较长的计算,可以在使用结果之前启动
class Demo{
	
	FutureTask<Product> future = new FutureTask<Product>(new Callable<Product>(){
		public Prodcut call() throws LoadException{
			return db.load();
		}
	});

	Thread thread = new Thread(future);

	void start(){
		thread.start();
	}

	Product getPro() throws LoadException,InterruptedException{
		try{
			return future.get();
		}catch(ExecutionException e){
			Throwable th = e.getCause();
			if(th instanceOf LoadException){
				throw LoadException
			}else{
				throw launderThrowable(cause); 
			}
		}
	}

}
  • A线程任务未完成能够阻塞当前B线程,实现控制流
  • Callable表示的任务可以抛出 受检查或者未受检查的异常, 并且任务代码都可能抛出一个Error
  • 无论代码中抛出什么异常,都会被封装ExecutionException中,并在调用get方法的时候抛出;ExecutionException作为 Throwable类返回,可能的情况比较复杂需要额外处理
  • 如果任务被取消,Get方法将会抛出 CancellationException
  • 对异常的处理,逻辑如下:
    • 检查已知的受检查异常,并重新抛出
    • Error:重新抛出
    • RuntimeException: 重新抛出
    • 抛出 IllegalStateException 表示这是一个逻辑错误
RuntimeException launderThrowable(Throwable cause){
	if(cause instanceOf RuntimeException){
		return (RuntimeException) cause;
	}else if(cause instanceOf Error){
		throw (Error) cause;
	}else{
		throw new IllegalStateException("checked",cause);
	}
}
信号量

定义:计数信号量用来控制同时访问:

  • 某个特定资源的操作数量
  • 某个指定操作的数量
  • 可以用来实现 资源池、容器的施加边界

原理:

  • Semaphore管理者一组虚拟许可(permit),初始数量在构造器中指定; 在执行操作的时候先获得许可,使用完成之后进行释放
  • 获取许可使用 acquire方法(理解为消费),如果没有可用的许可将会阻塞。 release方法返回一个许可(理解为创建)
  • Semaphore 不会与线程关联起来,A线程消费的信号可以在B线程创建,
  • 信号量不受限于构造过程中的创建的许可数量

适用场景:

  • 实现资源池,比如数据库连接,固定初始大小,每次获取需要阻塞等待资源(最好用 BlockingDeque实现)
  • 限定容器的大小。在容器外使用装饰器模式,并且在添加和删除的时候使用 Semaphore
class Demo{
	private final Semaphore sem = new Semaphore(2);
	
	public void test(){
		sem.acquire();
		...
		sem.release();
	}
}
栅栏(Barrier)

栅栏与闭锁有许多共性,一些区别如下:

闭锁可以控制一组相关操作,进入终止状态不能重置 栅栏状态可以重置
闭锁用于等待事件 栅栏用于等待其他线程
闭锁用于所有线程等待一个外部事件的发生 栅栏则是所有线程相互等待,直到所有线程都到达某一点时才打开栅栏,然后线程可以继续执行

单向栅栏:

常见场景:将问题分解成几个计算子任务,流程如下:

  • 当线程到达栅栏位置的时候,将会调用await方法,这个方法将阻塞直到所有的线程都到达栅栏的位置
  • 如果所有线程到达,栅栏将会打开,此时所有的线程将被释放,而栅栏将被重置等待下次使用
  • 如果对调用await方法超时,或者await阻塞的线程被中断,栅栏的完整性将被打破,所有阻塞的await调用都将终止并抛出BrokenBarrierException
  • 如果成功通过,await方法将会返回一个唯一索引,后续程序可以通过这个索引选举 leader,来执行一些特殊的操作

双向栅栏:

另一种形式的栅栏是 Exchanger: 当两方执行不对称操作的时候,例如一个线程向缓冲区写入数据,另一个线程向缓存区获取数据。这些线程可以用 Exchanger来汇合,将满的缓冲区和空的缓冲区进行交换。

练习,构建一个缓存,待完善。。。。

参考网站

Java 7之多线程并发容器 - CopyOnWriteArrayList

闭锁CountDownLatch与栅栏CyclicBarrier