Exchanger可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给exchange方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger可能被视为SynchronousQueue的双向形式。Exchanger可能在应用程序(比如遗传算法和管道设计)中很有用。
基本想法是维护一个槽指向一个结点,结点包含一个准备提供(出去)的item和一个等待填充的洞。如果一个到来的“occupying”线程看到槽是空的,CAS结点到那里并等待另一个线程发起交换。第二个“fulfilling”线程看到槽是非空的,把它CAS回空,也通过CAS洞交换item,加上唤醒occupying线程,如果它是阻塞的。在每种情况下CAS可能失败,因为一个槽一开始看起来是非空的,但在CAS时不是,或者相反,所以线程需要重试这些动作。
这个简单方法在只有少量线程使用Exchanger时工作得很好,当大量线程使用单个Exchanger时性能快速恶化,由于CAS竞争。因此我们使用一个“arena,竞技场”,基本上是一个哈希表,有动态可变数量的槽,每一个槽都可被任意线程执行交换。进入线程基于自己的线程ID选择槽,如果一个进入在它自己选择的槽上CAS失败,它选择一个供替代的槽。如果一个线程成功CAS到一个槽但没有其他线程到达,它尝试其他,前往0号槽,那里总是存在如果表收缩的话。控制这个的特殊机制如下:
根据算法描述里的一部分,做了个简单的实现。未测试。
此方法用于定位线程的默认槽。
此方法用于自旋等待,主要是默认槽是非0号槽的线程在没有线程可交换时使用。
doExchange方法是Exchanger类的核心方法,所以exchange方法极其变形最终都是调用这个方法。它的实现完全遵循前面的算法描述。
而对于java中的线程池,大家需要理解好ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor这几个类之间的关系即可。
通过看上面的这个关系图,可以知道,最核心的一个类是ThreadPoolExecutor,我们下面来具体的看一下这个类的操作。
ThreadPoolExecutor类提供了四个构造函数,如下所示
下面来分析下这四个构造函数中每个参数的具体意义,其实主要是一些初始化的操作
corePoolSize:表明这个线程池的大小
maximumPoolSize:表明这个线程池中,最多可以容纳多少个线程执行
workQueue:阻塞队列,表示如何线程多余线程池的话,用来存储等待执行的任务,具体值如下
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
threadFactory:一个线程工厂,用来创建线程
handler:用来表示拒绝执行任务的策略。
下面通过举个例子来理解上述的表示参数。
假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;然后就将任务也分配给这4个临时工人做;如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
下面通过一个例子来了解上述的构造函数。
上面就是开启20个任务,来让线程池处理,如果线程池满了的话,就会放入到阻塞对列中进行等待操作
不过在javadoc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:
Executors.newCachedThreadPool();//创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();//创建容量为1的缓冲池Executors.newFixedThreadPool(int);//创建固定容量大小的缓冲池
newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。
另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。
下面是8位Java牛人,他们为Java社区编写框架、产品、工具或撰写书籍改变了Java编程的方式。
不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。
SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
阻塞队列的实现方法有许多:
经典同步队列实现采用了三个信号量,代码很简单,比较容易理解:
Java5的实现相对来说做了一些优化,只使用了一个锁,使用队列代替信号量也可以允许发布者直接发布数据,而不是要首先从阻塞在信号量处被唤醒。
代码实现里的DualQueue或Stack内部是用链表(LinkedList)来实现的,其节点状态为以下三种情况:
这个算法的特点就是任何操作都可以根据节点的状态判断执行,而不需要用到锁。
其核心接口是Transfer,生产者的put或消费者的take都使用这个接口,根据第一个参数来区别是入列(栈)还是出列(栈)。
TransferQueue实现如下(摘自Java6源代码),入列和出列都基于Spin和CAS方法: