首页 >> 大全

《Java并发编程实战》【第二部分 结构化并发应用程序】

2023-07-29 大全 47 作者:考证青年

文章目录 6.2 框架 6.3 找出可利用的并行性 第7章 取消与关闭 7.2 停止基于线程的服务 7.3 处理非正常的线程终止 7.4 JVM关闭 第8章 线程池的使用 8.2 设置线程池的大小8.3 配置 8.4 扩展 8.5 递归算法的并行化 第9章 图形用户界面应用程序 9.2 短时间的GUI任务9.3 长时间的GUI任务 9.4 共享数据模型 9.5 其他形式的单线程子系统

第6章 任务执行

大多数并发应用程序都是围绕“任务执行(Task )” 来构造的:任务通常是一些抽象的且离散的工作单元。通过把应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,以及提供一种自然的并行工作结构来提升并发性。

6.1 在线程中执行任务

当围绕“任务执行”来设计应用程序结构时,第一步就是要找出清晰的任务边界。在理想情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态、结果或边界效应。独立性有助于实现并发,因为如果存在足够多的处理资源,那么这些独立的任务都可以并行执行。为了在调度与负载均衡等过程中实现更高的灵活性,每项任务还应该表示应用程序的一小部分处理能力。

在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。应用程序提供商希望程序支持尽可能多的用户,从而降低每个用户的服务成本,而用户则希望获得尽快的响应。而且,当负荷过载时,应用程序的性能应该是逐渐降低,而不是直接失败。要实现上述目标,应该选择清晰的任务边界以及明确的任务执行策略(请参见6.2.2节)。

大多数服务器应用程序都提供了一种自然的任务边界选择方式:以独立的客户请求为边界。Web服务器、邮件服务器、文件服务器、EJB容器以及数据库服务器等,这些服务器都能通过网络接受远程客户的连接请求。将独立的请求作为任务边界,既可以实现任务的独立性,又可以实现合理的任务规模。例如,在向邮件服务器提交一个消息后得到的结果,并不会受其他正在处理的消息影响,而且在处理单个消息时通常只需要服务器总处理能力的很小一部分。

6.1.1 串行的执行任务

在应用程序中可以通过多种策略来调度任务,而其中一些策略能够更好地利用潜在的并发性。最简单的策略就是在单个线程中串行地执行各项任务。程序清单6-1中的r将串行地处理它的任务(即通过80端口接收到的HTTP请求)。至于如何处理请求的细节问题,在这里并不重要,我们感兴趣的是如何表征不同调度策略的同步特性。

程序清单6-1串行的Web服务器

class SingleThreadWebServer {public static void main(String[] args) throws IOException {ServerSocket socket = new ServerSocket(80);while (true) {Socket connetion = socket.accept();handlerRequest(connection);}}
}

r很简单,且在理论上是正确的,但在实际生产环境中的执行性能却很糟糕,因为它每次只能处理一个请求。主线程在接受连接与处理相关请求等操作之间不断地交替运行。当服务器正在处理请求时,新到来的连接必须等待直到请求处理完成,然后服务器将再次调用。如果处理请求的速度很快并且可以立即返回,那么这种方法是可行的,但现实世界中的Web服务器的情况却并非如此。

在Web请求的处理中包含了一组不同的运算与I/O操作。服务器必须处理套接字I/O以读取请求和写回响应,这些操作通常会由于网络拥塞或连通性问题而被阻塞。此外,服务器还可能处理文件I/O或者数据库请求,这些操作同样会阻塞。在单线程的服务器中,阻塞不仅会推迟当前请求的完成时间,而且还将彻底阻止等待中的请求被处理。如果请求阻塞的时间过长,用户将认为服务器是不可用的,因为服务器看似失去了响应。同时,服务器的资源利用率非常低,因为当单线程在等待I/O操作完成时,CPU将处于空闲状态。

在服务器应用程序中,串行处理机制通常都无法提供高吞吐率或快速响应性。也有一些例外,例如,当任务数量很少且执行时间很长时,或者当服务器只为单个用户提供服务,并且该客户每次只发出一个请求时——但大多数服务器应用程序并不是按照这种方式来工作的。【在某些情况中,串行处理方式能带来简单性或安全性。大多数GUI框架都通过单一的线程来串行地处理任务。我们将在第9章再次介绍串行模型。】

6.1.2 显式地为任务创建线程

通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性,如程序清单6-2中的er所示。

程序清单6-2在Web服务器中为每个请求启动一个新的线程(不要这么做)

class ThreadPerTaskWebServer {public static void main(String[] args) {ServerSocket socket = new ServerSocket(80);while (true) {final Socket connection = socket.accept();Runnable task = new Runnable() {@Override public void run() {handlerRequest(connection);}};new Thread(task).start();}}
}

er在结构上类似于前面的单线程版本——主线程仍然不断地交替执行“接受外部连接”与“分发请求”等操作。区别在于,对于每个连接,主循环都将创建一个新线程来处理请求,而不是在主循环中进行处理。由此可得出3个主要结论:

6.1.3 无限制创建线程的不足

在生产环境中,“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是当需要创建大量的线程时:

线程生命周期的开销非常高。 线程的创建与销毁并不是没有代价的。根据平台的不同,实际的开销也有所不同,但线程的创建过程都会需要时间,延迟处理的请求,并且需要JVM和操作系统提供一些辅助操作。如果请求的到达率非常高且请求的处理过程是轻量级的,例如大多数服务器应用程序就是这种情况,那么为每个请求创建一个新线程将消耗大量的计算资源。

资源消耗。 活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程数量多于可用处理器的数量,那么有些线程将闲置。大量空闲的线程会占用许多内存,给垃圾回收器带来压力,而且大量线程在竞争CPU资源时还将产生其他的性能开销。如果你已经拥有足够多的线程使所有CPU保持忙碌状态,那么再创建更多的线程反而会降低性能。

稳定性。 在可创建线程的数量上存在一个限制。这个限制值将随着平台的不同而不同,并且受多个因素制约,包括JVM的启动参数、构造函数中请求的栈大小,以及底层操作系统对线程的限制等【在32位的机器上,其中一个主要的限制因素是线程栈的地址空间。每个线程都维护两个执行栈,一个用于Java代码,另一个用于原生代码。通常,JVM在默认情况下会生成一个复合的栈,大小约为0.5MB。(可以通过JVM标志-Xss或者通过的构造函数来修改这个值。)如果将223除以每个线程的栈大小,那么线程数量将被限制为几千到几万。其他一些因素,例如操作系统的限制等,则可能会施加更加严格的约束。】。如果破坏了这些限制,那么很可能抛出异常,要想从这种错误中恢复过来是非常危险的,更简单的办法是通过构造程序来避免超出这些限制。

在一定的范围内,增加线程可以提高系统的吞吐率,但如果超出了这个范围,再创建更多的线程只会降低程序的执行速度,并且如果过多地创建一个线程,那么整个应用程序将崩溃。要想避免这种危险,就应该对应用程序可以创建的线程数量进行限制,并且全面地测试应用程序,从而确保在线程数量达到限制时,程序也不会耗尽资源。

“为每个任务分配一个线程”这种方法的问题在于,它没有限制可创建线程的数量,只限制了远程用户提交HTTP请求的速率。与其他的并发危险一样,在原型设计和开发阶段,无限制地创建线程或许还能较好地运行,但在应用程序部署后并处于高负载下运行时,才会有问题不断地暴露出来。因此,某个恶意的用户或者过多的用户,都会使Web服务器的负载达到某个阈值,从而使服务器崩溃。如果服务器需要提供高可用性,并且在高负载情况下能平缓地降低性能,那么这将是一个严重的故障。

6.2 框架

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。我们已经分析了两种通过线程来执行任务的策略,即把所有任务放在单个线程中串行执行,以及将每个任务放在各自的线程中执行。这两种方式都存在一些严格的限制:串行执行的问题在于其糟糕的响应性和吞吐量,而“为每个任务分配一个线程”的问题在于资源管理的复杂性。

在第5章中,我们介绍了如何通过有界队列来防止高负荷的应用程序耗尽内存。线程池简化了线程的管理工作,并且java.util.提供了一种灵活的线程池实现作为框架的一部分。在Java类库中,任务执行的主要抽象不是,而是,如程序清单6-3所示。

程序清单6-接口

public interface Executor {void execute(Runnable command);
}

虽然是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用来表示任务。的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式通常就是使用。

6.2.1 示例 基于的Web服务器

基于来构建Web服务器是非常容易的。在程序清单6-4中用代替了硬编码的线程创建过程。在这种情况下使用了一种标准的实现,即一个固定长度的线程池,可以容纳100个线程。

基于来构建Web服务器是非常容易的。在程序清单6-4中用代替了硬编码的线程创建过程。在这种情况下使用了一种标准的实现,即一个固定长度的线程池,可以容纳100个线程。

class TaskExecutionWebServer {private static final int NTHREADS = 100;private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);public static void main(String[] args) throws IOException {ServerSocket socket = new ServerSocket(80); while (true) {final Socket connection = socket.accept();Runnable task = new Runnable() {@Overridepublic void run() {handleRequest(connection);}};exec.execute(task);}}
}

在er中,通过使用,将请求处理任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的实现,就可以改变服务器的行为。改变实现或配置所带来的影响要远远小于改变任务提交方式带来的影响。通常,的配置是一次性的,因此在部署阶段可以完成,而提交任务的代码却会不断地扩散到整个程序中,增加了修改的难度。

我们可以很容易地将er修改为类似er的行为,只需使用一个为每个请求都创建新线程的。编写这样的很简单,如程序清单6-5中的r所示。

我们可以很容易地将er修改为类似er的行为,只需使用一个为每个请求都创建新线程的。编写这样的很简单,如程序清单6-5中的r所示。

public class ThreadPerTaskExecutor implements Executor {@Overridepublic void execute(Runnable r) {new Thread(r).start();}
}

同样,还可以编写一个使er的行为类似于单线程的行为,即以同步的方式执行每个任务,然后再返回,如程序清单6-6中的所示。

程序清单6-6在调用线程中以同步方式执行所有任务的

public class WithinThreadExecutor implements Executor {@Overridepublic void execute(Runnable r) {r.run();}
}

6.2.2 执行策略

通过将任务的提交与执行解耦开来,从而无须太大的困难就可以为某种类型的任务指定和修改执行策略。在执行策略中定义了任务执行的“What、Where、When、How”等方面,包括:

new Thread(runnable).start();

并且你希望获得一种更灵活的执行策略时,请考虑使用来代替。

6.2.3 线程池

线程池,从字面含义来看,是指管理一组同构工作线程的资源池。线程池是与工作队列(Work Queue)密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程( )的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

“在线程池中执行任务”比“为每个任务分配一个线程”优势更多。通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。另一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。

类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用中的静态工厂方法之一来创建一个线程池:

。将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的而结束,那么线程池会补充一个新的线程)。

**。**将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。

**tor。**tor是一个单线程的,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。tor能确保依照任务在队列中的顺序来串行执行(例如FIFO、LIFO、优先级)。【单线程的还提供了大量的内部同步机制,从而确保了任务执行的任何内存写入操作对于后续任务来说都是可见的。这意味着,即使这个线程会不时地被另一个线程替代,但对象总是可以安全地封闭在“任务线程”中。】

**ol。**ol创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer(参见6.2.5节)。

和这两个工厂方法返回通用的-实例,这些实例可以直接用来构造专门用途的。我们将在第8章中深入讨论线程池的各个配置选项。

从“为每任务分配一个线程”策略变成基于线程池的策略,将对应用程序的稳定性产生重大的影响:Web服务器不会再在高负载情况下失败【尽管服务器不会因为创建了过多的线程而失败,但在足够长的时间内,如果任务到达的速度总是超过任务执行的速度,那么服务器仍有可能(只是更不易)耗尽内存,因为等待执行的队列将不断增长。可以通过使用一个有界工作队列在框架内部解决这个问题(参见8.3.2节)】。由于服务器不会创建数千个线程来争夺有限的CPU和内存资源,因此服务器的性能将平缓地降低。通过使用,可以实现各种调优、管理、监视、记录日志、错误报告和其他功能,如果不使用任务执行框架,那么要增加这些功能是非常困难的。

6.2.4 的生命周期

我们已经知道如何创建一个,但并没有讨论如何关闭它。的实现通常会创建线程来执行任务。但JVM只有在所有(非守护)线程全部终止后才会退出。因此,如果无法正确地关闭,那么JVM将无法结束。

由于以异步方式来执行任务,因此在任何时刻,之前提交任务的状态不是立即可见的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。当关闭应用程序时,可能采用最平缓的关闭形式(完成所有已经启动的任务,并且不再接受任何新的任务),也可能采用最粗暴的关闭形式(直接关掉机房的电源),以及其他各种可能的形式。既然是为应用程序提供服务的,因而它们也是可关闭的(无论采用平缓的方式还是粗暴的方式),并将在关闭操作中受影响的任务的状态反馈给应用程序。

为了解决执行服务的生命周期问题,扩展了接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。在程序清单6-7中给出了中的生命周期管理方法。

程序清单6-中的生命周期管理方法

public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;// ... 其他用于任务提交的便利方法
}

的生命周期有3种状态:运行、关闭和已终止。在初始创建时处于运行状态。方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

在关闭后提交的任务将由“拒绝执行处理器( )”来处理(请参见8.3.3节),它会抛弃任务,或者使得方法抛出一个未检查的-。等所有任务都完成后,将转入终止状态。可以调用来等待到达终止状态,或者通过调用来轮询是否已经终止。通常在调用之后会立即调用,从而产生同步地关闭的效果。(第7章将进一步介绍的关闭和任务取消等方面的内容。)

程序清单6-8的通过增加生命周期支持来扩展Web服务器的功能。可以通过两种方法来关闭Web服务器:在程序中调用stop,或者以客户端请求形式向Web服务器发送一个特定格式的HTTP请求。

程序清单6-8支持关闭操作的Web服务器

class LifecycleWebServer {private final ExecutorService exec = Executors.newFixedThreadPool(10);public void start() throws IOException {ServerSocket socket = new ServerSocket(80);while (!exec.isShutdown()) {try {final Socket conn = socket.accept();exec.execute(new Runnable() {@Overridepublic void run() {handlerRequest(conn);}}); } catch (Exception ex) {if (!exec.isShutdown()) {log("task submission rejected", ex);}}}}public void stop() { exec.shutdown(); }void handlerRequest(Socket connection) {Request req = readRequest(connection);if (isShutdownRequest(req)) {stop();} else {dispatchRequest(req);}}
}

6.2.5 延迟任务与周期任务

Timer类负责管理延迟任务(“在100ms后执行该任务”)以及周期任务(“每l0ms执行一次该任务”)。然而,Timer存在一些缺陷,因此应该考虑使用来代替它【Timer支持基于绝对时间而不是相对时间的调度机制,因此任务的执行对系统时钟变化很敏感,而只支持基于相对时间的调度。】。可以通过的构造函数或ol工厂方法来创建该类的对象。

Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他的定时精确性。例如某个周期需要每l0ms执行一次,而另一个需要执行40ms,那么这个周期任务或者在40ms任务执行完成后快速连续地调用4次,或者彻底“丢失”4次调用(取决于它是基于固定速率来调度还是基于固定延时来调度)。线程池能弥补这个缺陷,它可以提供多个线程来执行延时任务和周期任务。

Timer的另一个问题是,如果抛出了一个未检查的异常,那么Timer将表现出糟糕的行为。Timer线程并不捕获异常,因此当抛出未检查的异常时将终止定时线程。这种情况下,Timer也不会恢复线程的执行,而是会错误地认为整个Timer都被取消了。因此,已经被调度但尚未执行的将不会再执行,新的任务也不能被调度。(这个问题称之为“线程泄漏[]”,7.3节将介绍该问题以及如何避免它。)

在程序清单6-9的中给出了Timer中为什么会出现这种问题,以及如何使得试图提交的调用者也出现问题。你可能认为程序会运行6秒后退出,但实际情况是运行1秒就结束了,并抛出了一个异常消息“Timer ”。能正确处理这些表现出错误行为的任务。在Java 5.0或更高的JDK中,将很少使用Timer。

如果要构建自己的调度服务,那么可以使用,它实现了,并为提供调度功能。管理着一组对象。每个对象都有一个相应的延迟时间:在中,只有某个元素逾期后,才能从中执行take操作。从中返回的对象将根据它们的延迟时间进行排序。

6.3 找出可利用的并行性

框架帮助指定执行策略,但如果要使用,必须将任务表述为一个。在大多数服务器应用程序中都存在一个明显的任务边界:单个客户请求。但有时候,任务边界并非是显而易见的,例如在很多桌面应用程序中。即使是服务器应用程序,在单个客户请求中仍可能存在可发掘的并行性,例如数据库服务器。(请参见[CPJ 4.4.1.1]了解在选择任务边界时的各种权衡因素及相关讨论。)

程序清单6-9错误的Timer行为

public class OutOfTime {public static void main(String[] args) {Timer timer = new Timer();timer.schedule(new ThrowTask() ,1);SECONDS.sleep(1);timer.schedule(new ThrowTask(), 1);SECONDS.sleep(5);}static class ThrowTask extends TimerTask {@Override public void run() { throw new RuntimeException(); }}
}

本节中我们将开发一些不同版本的组件,并且每个版本都实现了不同程度的并发性。该示例组件实现浏览器程序中的页面渲染(Page-)功能,它的作用是将HTML页面绘制到图像缓存中。为了简便,假设HTML页面只包含标签文本,以及预定义大小的图片和URL。

6.3.1 示例 串行的页面渲染器

最简单的方法就是对HTML文档进行串行处理。当遇到文本标签时,将其绘制到图像缓存中。当遇到图像引用时,先通过网络获取它,然后再将其绘制到图像缓存中。这很容易实现,程序只需将输入中的每个元素处理一次(甚至不需要缓存文档),但这种方法可能会令用户感到烦恼,他们必须等待很长时间,直到显示所有的文本。

另一种串行执行方法更好一些,它先绘制文本元素,同时为图像预留出矩形的占位空间,在处理完了第一遍文本后,程序再开始下载图像,并将它们绘制到相应的占位空间中。在程序清单6-10的中给出了这种方法。

图像下载过程的大部分时间都是在等待I/O操作执行完成,在这期间CPU几乎不做任何工作。因此,这种串行执行方法没有充分地利用CPU,使得用户在看到最终页面之前要等待过长的时间。通过将问题分解为多个独立的任务并发执行,能够获得更高的CPU利用率和响应灵敏度。

程序清单6-10串行地渲染页面元素

public class SingleThreadRenderer {void renderPage(CharSequence source) {renderText(source);List<ImageData> imageData = new ArrayList<ImageData>();for (ImageInfo imageInfo : scanForImageInfo(source))imageData.add(imageInfo);for (imageData data : imageData)renderImage(data);}
}

6.3.2 携带结果的任务与

框架使用作为其基本的任务表示形式。是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。

许多任务实际上都是存在延迟的计算——执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,是一种更好的抽象:它认为主入口点(即call)将返回一个值,并可能抛出一个异常。【要使用来表示无返回值的任务,可使用<Void>。】在中包含了一些辅助方法能将其他类型的任务封装为一个,例如和java..。

和描述的都是抽象的计算任务。这些任务通常是有范围的,即都有一个明确的起始点,并且最终会结束。执行的任务有4个生命周期阶段:创建、提交、开始和完成。由于有些任务可能要执行很长的时间,因此通常希望能够取消这些任务。在框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。(第7章将进一步介绍取消操作。)

表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。在程序清单6-11中给出了和。在规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像的生命周期一样。当某个任务完成后,它就永远停留在“完成”状态上。

get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将该异常封装为并重新抛出。如果任务被取消,那么get将抛出n。如果get抛出了,那么可以通过来获得被封装的初始异常。

序清单6-与接口

public interface Callable<V> {V call() throws Exception;
}
public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);boolean isCancelled();boolean isDnoe();V get() throws InterruptedException, ExceutionException, CancellationExcepiton;V get(long timeout, TimeUnit unit) throws InterruptedException, ExceutionException, CancellationExcepiton, TimeoutException;
}

可以通过许多种方法创建一个来描述任务。中的所有方法都将返回一个,从而将一个或提交给,并得到一个用来获得任务的执行结果或者取消任务。还可以显式地为某个指定的或实例化一个。(由于实现了,因此可以将它提交给来执行,或者直接调用它的run方法。)

从Java 6开始,实现可以改写ice中的方法,从而根据已提交的或来控制的实例化过程。在默认实现中仅创建了一个新的,如程序清单6-12所示。

程序清单6-中的默认实现

protected <T> RunnableFuture<T> newTaskFor(Callable<T> task) {return new FutureTask<T>(task);
}

在将或提交到的过程中,包含了一个安全发布过程(请参见3.5节),即将或从提交线程发布到最终执行任务的线程。类似地,在设置结果的过程中也包含了一个安全发布,即将这个结果从计算它的线程发布到任何通过get获得它的线程。

6.3.3 示例 使用实现页面渲染器

为了使页面渲染器实现更高的并发性,首先将渲染过程分解为两个任务,一个是渲染所有的文本,另一个是下载所有的图像。(因为其中一个任务是CPU密集型,而另一个任务是I/O密集型,因此这种方法即使在单CPU系统上也能提升性能。)

和有助于表示这些协同任务之间的交互。在程序清单6-13的-中创建了一个来下载所有的图像,并将其提交到一个。这将返回一个描述任务执行情况的。当主任务需要图像时,它会等待.get的调用结果。如果幸运的话,当开始请求时所有图像就已经下载完成了,即使没有,至少图像的下载任务也已经提前开始了。

程序清单6-13使用等待图像下载

public class FutureRenderer {private final ExecutorService executor = Executors.newFixedThreadPool(10);void renderPage(CharSequence source) {final List<ImageInfo> imageInfos = scanForImageInfo(source);Callable<List<ImageDate>> task = new Callable<List<ImageDate>>() {@Overridepublic List<ImageDate> call() throws Exception {List<ImageData> result = new ArrayList<>();for (ImageInfo imageInfo : imageInfos)result.add(imageInfo.downloadImage());return result;}};Future<List<ImageData>> future = executor.submit(task);renderText(sources);try {List<ImageData> imageData = future.get();for (ImageData data : imageData)renderImage(data);  } catch (InterruptedException ex) {// 重新设置线程的中断状态Thread.currentThread().interrupt();// 由于不需要结果,因此取消任务future.cancel(true);} catch (ExecutionException ex) {throw launderThrowable(ex.getCause());}}
}

get方法拥有“状态依赖”的内在特性,因而调用者不需要知道任务的状态,此外在任务提交和获得结果中包含的安全发布属性也确保了这个方法是线程安全的。.get的异常处理代码将处理两个可能的问题:任务遇到一个,或者调用get的线程在获得结果之前被中断(请参见5.5.2节和5.4节)。 使得渲染文本任务与下载图像数据的任务并发地执行。当所有图像下载完后,会显示到页面上。这将提升用户体验,不仅使用户更快地看到结果,还有效利用了并行性,但我们还可以做得更好。用户不必等到所有的图像都下载完成,而希望看到每当下载完一幅图像时就立即显示出来。

6.3.4 在异步任务并行化中存在的局限

在上个示例中,我们尝试并行地执行两个不同类型的任务——下载图像与渲染页面。然而,通过对异构任务进行并行化来获得重大的性能提升是很困难的。

两个人可以很好地分担洗碗的工作:其中一个人负责清洗,而另一个人负责烘干。然而,要将不同类型的任务平均分配给每个工人却并不容易。当人数增加时,如何确保他们能帮忙而不是妨碍其他人工作,或者在重新分配工作时,并不是容易的事情。如果没有在相似的任务之间找出细粒度的并行性,那么这种方法带来的好处将减少。

当在多个工人之间分配异构的任务时,还有一个问题就是各个任务的大小可能完全不同。如果将两个任务A和B分配给两个工人,但A的执行时间是B的10倍,那么整个过程也只能加速9%。最后,当在多个工人之间分解任务时,还需要一定的任务协调开销:为了使任务分解能提高性能,这种开销不能高于并行性实现的提升。

使用了两个任务,其中一个负责渲染文本,另一个负责下载图像。如果渲染文本的速度远远高于下载图像的速度(可能性很大),那么程序的最终性能与串行执行时的性能差别不大,而代码却变得更复杂了。当使用两个线程时,至多能将速度提高一倍。因此,虽然做了许多工作来并发执行异构任务以提高并发度,但从中获得的并发性却是十分有限的。(在11.4.2节和11.4.3节中的示例说明了同一个问题。)

只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。

6.3.5 :与

如果向提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的,然后反复使用get方法,同时将参数指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务()。

将和的功能融合在一起。你可以将任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时将被封装为。rvice实现了,并将计算部分委托给一个。

rvice的实现非常简单。在构造函数中创建一个来保存计算完成的结果。当计算完成时,调用-Task中的done方法。当提交某个任务时,该任务将首先包装为一个,这是的一个子类,然后再改写子类的done方法,并将结果放入中,如程序清单6-14所示。take和poll方法委托给了,这些方法会在得出结果之前阻塞。

程序清单6-14由rvice使用的类

private class QueueingFuture<V> extends FutureTask<V> {QueueingFuture<Callable<V> c) { super(c); }QueueingFuture(Runnable t, V r) { super(t, r); }protected void done() { completionQueue.add(this); }
}

6.3.6 示例 使用实现页面渲染器

可以通过从两个方面来提高页面渲染器的性能:缩短总运行时间以及提高响应性。为每一幅图像的下载都创建一个独立任务,并在线程池中执行它们,从而将串行的下载过程转换为并行的过程:这将减少下载所有图像的总时间。此外,通过从中获取结果以及使每张图片在下载完成后立刻显示出来,能使用户获得一个更加动态和更高响应性的用户界面。如程序清单6-15的所示。 程序清单6-15使用,使页面元素在下载完成后立即显示出来

public class Renderer {private final ExecutorService executor;Renderer(ExecutorService executor) { this.executor = executor; }void renderPage(CharExpression source) {List<ImageInfo> info = sacnForImageInfo(source);CompletionService<ImageData> completionService = new ExecutorCompletionService<>();for (final ImageInfo imageInfo : info) {completionService.submit(new Callable<ImageData>() {@Overridepublic ImageData call() throws Exception {return imageInfo.downloadImage();}});}rendderText(source);try {for (int t = 0, n = info.size(); t < n; t++) {Future<ImageData> f = completionService.take();ImageData imageData = f.get();renderImage(imageData);}} catch (Exception ex) {Thread.currentThread().interrupt();  } catch (ExecutionException ex) {throw launderThrowable(ex.getCause());  }}  
}

多个rvice可以共享一个,因此可以创建一个对于特定计算私有,又能共享一个公共的rvice。因此,的作用就相当于一组计算的句柄,这与作为单个计算的句柄是非常类似的。通过记录提交给的任务数量,并计算出已经获得的已完成结果的数量,即使使用一个共享的,也能知道已经获得了所有任务结果的时间。

6.3.7 为任务设置时限

有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。例如,某个Web应用程序从外部的广告服务器上获取广告信息,但如果该应用程序在两秒钟内得不到响应,那么将显示一个默认的广告,这样即使不能获得广告信息,也不会降低站点的响应性能。类似地,一个门户网站可以从多个数据源并行地获取数据,但可能只会在指定的时间内等待数据,如果超出了等待时间,那么只显示已经获得的数据。

在有限时间内执行任务的主要困难在于,要确保得到答案的时间不会超过限定的时间,或者在限定的时间内无法获得答案。在支持时间限制的.get中支持这种需求:当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么将抛出。

在使用限时任务时需要注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由任务本身来管理它的限定时间,并且在超时后中止执行或取消任务。此时可再次使用,如果一个限时的get方法抛出了,那么可以通过来取消任务。如果编写的任务是可取消的(参见第7章),那么可以提前中止它,以免消耗过多的资源。在程序清单6-13和6-16的代码中使用了这项技术。

程序清单6-16给出了限时.get的一种典型应用。在它生成的页面中包括响应用户请求的内容以及从广告服务器上获得的广告。它将获取广告的任务提交给一个,然后计算剩余的文本页面内容,最后等待广告信息,直到超出指定的时间【传递给get的参数的计算方法是,将指定时限减去当前时间。这可能会得到负数,但java.util.中所有与时限相关的方法都将负数视为零,因此不需要额外的代码来处理这种情况。】。如果get超时,那么将取消【.的参数为true表示任务线程可以在运行过程中中断。请参见第7章。】广告获取任务,并转而使用默认的广告信息。

程序清单6-16在指定时间内获取广告信息

public class Renderer {Page renderPageWithAd() {long endNanos = System.nanoTime() + TIME_BUDGET;Future<Ad> f = exec.submit(new FetchAdTask());// 在等待广告的同时显示页面Page page = renderPageBody();Ad ad;try {// 只等待指定的时间长度long timeLeft = endNanos - System.nanoTime();ad = f.get(timeLeft.HANOSECONDS);} catch (ExecutionException ex) {ad = DEFAULT_AD;  } catch (TimeoutException e) {ad = DEFAULT_AD;f.cancel(true);}page.setAd(ad);return page;}  
}

6.3.8 示例 旅游预订门户网站

“预定时间”方法可以很容易地扩展到任意数量的任务上。考虑这样一个旅行预定门户网站:用户输入旅行的日期和其他要求,门户网站获取并显示来自多条航线、旅店或汽车租赁公司的报价。在获取不同公司报价的过程中,可能会调用Web服务、访问数据库、执行一个EDI事务或其他机制。在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示在指定时间内收到的信息。对于没有及时响应的服务提供者,页面可以忽略它们,或者显示一个提示信息,例如“Did not hear from Air Java in time。”

从一个公司获得报价的过程与从其他公司获得报价的过程无关,因此可以将获取报价的过程当成一个任务,从而使获得报价的过程能并发执行。创建n个任务,将其提交到一个线程池,保留n个,并使用限时的get方法通过串行地获取每一个结果,这一切都很简单,但还有一个更简单的方法——。

程序清单6-17使用了支持限时的,将多个任务提交到一个并获得结果。方法的参数为一组任务,并返回一组。这两个集合有着相同的结构。按照任务集合中迭代器的顺序将所有的添加到返回的集合中,从而使调用者能将各个与其表示的关联起来。当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时,将返回。当超过指定时限后,任何还未完成的任务都会取消。当返回后,每个任务要么正常地完成,要么被取消,而客户端代码可以调用get或来判断究竟是何种情况。

程序清单6-17在预定时间内请求旅游报价

public class QuoteTask implements Callable<TravelQuote> {private final TravelCompany company;private final TravelInfo travelInfo;public TrravelQuote call() throws Exception {return company.solicitQuote(travelInfo);  }
}
public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companyes,Comparator<TravelQuote> ranking, long time, TimeUnit unit) throws InterruptedException 
{List<QuoteTask> tasks = new ArrayList<>();for (TravelCompany company : companies) {tasks.add(new QuoteTask(company, travelInfo));  }List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);List<TravelQuote> quotes = new ArrayList<>(tasks.size());Iterator<QuoteTask> taskIter = tasks.iterator();for (Future<TravelQuote> f : futures) {QuoteTask task = taskIter.next();try {quotes.add(f.get());  } catch (ExecutionException ex) {quotes.add(task.getFailureQuote(ex.getCause()));} catch (CancellationException ex) {quotes.add(task.getTimeoutQuote(ex));  }}Collections.sort(quotes, ranking);return quotes;
}

小结

通过围绕任务执行来设计应用程序,可以简化开发过程,并有助于实现并发。框架将任务提交与执行策略解耦开来,同时还支持多种不同类型的执行策略。当需要创建线程来执行任务时,可以考虑使用。要想在将应用程序分解为不同的任务时获得最大的好处,必须定义清晰的任务边界。某些应用程序中存在着比较明显的任务边界,而在其他一些程序中则需要进一步分析才能揭示出粒度更细的并行性。

第7章 取消与关闭

任务和线程的启动很容易。在大多数时候,我们都会让它们运行直到结束,或者让它们自行停止。然而,有时候我们希望提前结束任务或线程,或许是因为用户取消了操作,或者应用程序需要被快速关闭。

要使任务和线程能安全、快速、可靠地停止下来,并不是一件容易的事。Java没有提供任何机制来安全地终止线程。【虽然.stop和等方法提供了这样的机制,但由于存在着一些严重的缺陷,因此应该避免使用。请参见】。但它提供了中断(),这是一种协作机制,能够使一个线程终止另一个线程的当前工作。但它提供了中断(),这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

这种协作式的方法是必要的,我们很少希望某个任务、线程或服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态。相反,在编写任务和服务时可以使用一种协作的方式:当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。这提供了更好的灵活性,因为任务本身的代码比发出取消请求的代码更清楚如何执行清除工作。

生命周期结束(End-of-)的问题会使任务、服务以及程序的设计和实现等过程变得复杂,而这个在程序设计中非常重要的要素却经常被忽略。一个在行为良好的软件与勉强运行的软件之间的最主要区别就是,行为良好的软件能很完善地处理失败、关闭和取消等过程。本章将给出各种实现取消和中断的机制,以及如何编写任务和服务,使它们能对取消请求做出响应。

7.1 任务取消

如果外部代码能在某个操作正常完成之前将其置入“完成”状态,那么这个操作就可以称为可取消的()。取消某个操作的原因很多:

用户请求取消。用户点击图形界面程序中的“取消”按钮,或者通过管理接口来发出取消请求,例如JMX(Java )。

并发编程应用场景__并发编程模型

有时间限制的操作。例如,某个应用程序需要在有限时间内搜索问题空间,并在这个时间内选择最佳的解决方案。当计时器超时时,需要取消所有正在搜索的任务。

应用程序事件。例如,应用程序对某个问题空间进行分解并搜索,从而使不同的任务可以搜索问题空间中的不同区域。当其中一个任务找到了解决方案时,所有其他仍在搜索的任务都将被取消。

错误。网页爬虫程序搜索相关的页面,并将页面或摘要数据保存到硬盘。当一个爬虫任务发生错误时(例如,磁盘空间已满),那么所有搜索任务都会取消,此时可能会记录它们的当前状态,以便稍后重新启动。

关闭。当一个程序或服务关闭时,必须对正在处理和等待处理的工作执行某种操作。在平缓的关闭过程中,当前正在执行的任务将继续执行直到完成,而在立即关闭过程中,当前的任务则可能取消。

在Java中没有一种安全的抢占式方法来停止线程,因此也就没有安全的抢占式方法来停止任务。只有一些协作式的机制,使请求取消的任务和代码都遵循一种协商好的协议。

其中一种协作机制能设置某个“已请求取消( )”标志,而任务将定期地查看该标志。如果设置了这个标志,那么任务将提前结束。程序清单7-1中就使用了这项技术,其中的持续地枚举素数,直到它被取消。方法将设置标志,并且主循环在搜索下一个素数之前会首先检查这个标志。(为了使这个过程能可靠地工作,标志必须为类型。)

程序清单7-1使用类型的域来保存取消状态

public @ThreadSafe
class PrimeGenerator implements Runnable {@GuardedBy("this")private final List<BigInteger> primes = new ArrayList<>();private volatile boolean cancelled;@Override public void run() {BigInteger p = BigInteger.ONE;while (!cancelled) {p = p.nextProbablePrime();synchronized (this) {primes.add(p);}}}public void cancel() { cancelled = true; }public synchronized List<BigInteger> get() { return new ArrayList<>(primes); }
}

程序清单7-2给出了这个类的使用示例,即让素数生成器运行1秒钟后取消。素数生成器通常并不会刚好在运行1秒钟后停止,因为在请求取消的时刻和run方法中循环执行下一次检查之间可能存在延迟。方法由块调用,从而确保即使在调用sleep时被中断也能取消素数生成器的执行。如果没有被调用,那么搜索素数的线程将永远运行下去,不断消耗CPU的时钟周期,并使得JVM不能正常退出。

程序清单7-2一个仅运行一秒钟的素数生成器

List<BigInteger> aSecondOfPrimes() throws InterruptedException {PrimeGenerator generator = new PrimeGenerator();new Thread(generator).start();try {SECONDS.sleep(1);} finally {generator.cancel();}return generator.get();
}

一个可取消的任务必须拥有取消策略( ),在这个策略中将详细地定义取消操作的“How”、“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作。

考虑现实世界中停止支付(Stop-)支票的示例。银行通常都会规定如何提交一个停止支付的请求,在处理这些请求时需要做出哪些响应性保证,以及当支付中断后需要遵守哪些流程(例如通知该事务中涉及的其他银行,以及对付款人的账户进行费用评估)。这些流程和保证放在一起就构成了支票支付的取消策略。

使用了一种简单的取消策略:客户代码通过调用来请求取消,在每次搜索素数前首先检查是否存在取消请求,如果存在则退出。

7.1.1 中断

中的取消机制最终会使得搜索素数的任务退出,但在退出过程中需要花费一定的时间。然而,如果使用这种方法的任务调用了一个阻塞方法,例如.put,那么可能会产生一个更严重的问题——任务可能永远不会检查取消标志,因此永远不会结束。

在程序清单7-3中的就说明了这个问题。如果生产者的速度超过了消费者的处理速度,队列将被填满,put方法也会阻塞。当生产者在put方法中阻塞时,如果消费者希望取消生产者任务,那么将发生什么情况?它可以调用方法来设置标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的put方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以put方法将一直保持阻塞状态)。

程序清单7-3不可靠的取消操作将把生产者置于阻塞的操作中(不要这么做)

class BrokenPrimeProducer extends Thread {private final BlockingQueue<BigInteger> queue;private volatile boolean cancelled = false;BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {this.queue = queue;}@Overridepublic void run() {try {BigInteger p = BigInteger.ONE;while (!cancelled) {queue.put(p = p.nextProbablePrime());}} catch (InterruptedException consumed) {}}public void cancel() { cancelled = true; }
}void consumePrimes() throws InterruptedException {BlockingQueue<BigInteger> primes = ...;BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);try {while (needMorePrimes()) {consume(primes.take());}} finally {producer.cancel();}
}

第5章曾提到,一些特殊的阻塞库的方法支持中断。线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。

在Java的API或语言规范中,并没有将中断与任何取消语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。

每个线程都有一个类型的中断状态。当中断线程时,这个线程的中断状态将被设置为true。在中包含了中断线程以及查询线程中断状态的方法,如程序清单7-4所示。方法能中断目标线程,而方法能返回目标线程的中断状态。静态的方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。

程序清单7-中的中断方法

public class Thread {public void interrupt() { ... }public boolean isInterrupted() { ... }public static boolean interrupted() { ... }...
}

阻塞库方法,例如.sleep和.wait等,都会检查线程何时中断,并且在发现中断时提前返回。它们在响应中断时执行的操作包括:清除中断状态,抛出,表示阻塞操作由于中断而提前结束。JVM并不能保证阻塞方法检测到中断的速度,但在实际情况中响应速度还是非常快的。

当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。通过这样的方法,中断操作变得“有粘性”----- 如果不触发,那么中断状态将一直保持,知道明确的清除中断状态。

通过并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

对中断操作的正确理解是:它并不会真正的中断一个正在运行的线程,而只是发出中断请求,然后由线程在一个合适的时刻中断自己。(这些时刻也被称为取消点。)有些方法,例如wait、sleep和join等,将严格处理这种请求,当它们在收到中断请求或者在开始执行时发现某个已经被设置好的中断状态时,将抛出一个异常。设计良好的方法可以完全忽略这种请求,只要它们能使调用代码对中断请求进行某种处理。设计糟糕的方法可能会屏蔽中断请求,从而导致调用栈的其他代码无法对中断请求作出响应。

在使用静态的时应该小心,因为它会清除当前线程的中断状态。如果在调用时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出,或者通过再次调用来恢复中断状态,如程序清单5-10所示。

说明了一些自定义的取消机制无法与可阻塞的库函数实现良好交互的原因。如果任务代码能够响应中断,那么可以使用中断作为取消机制,并且利用许多库类中提供的中断支持。

通常,中断是实现取消的最合理方式。

中的问题很容易解决(和简化):使用中断而不是标志来请求取消,如程序清单7-5所示。在每次迭代循环中,有两个位置可以检测出中断:在阻塞的put方法调用中,以及在循环开始处查询中断状态时。由于调用了阻塞的put方法,因此这里并不一定需要进行显式的检测,但执行检测却会使对中断具有更高的响应性,因为它是在启动寻找素数任务之前检查中断的,而不是在任务完成之后。如果可中断的阻塞方法的调用频率并不高,不足以获得足够的响应性,那么显式地检测中断状态能起到一定的帮助作用。

程序清单7-5通过中断来取消

class PrimeProducer extends Thread {private final BlockingQueue<BigInteger> queue;PrimeProducer(BlockingQueue<BigInteger> queue) { this.queue = queue; } @Overridepublic void run() {try {BigInteger p = Integer.ONE;while (!Thread.currentThread().isInterrupted()) {queue.put(p = p.nextProbablePrime());}} catch () { /* 允许线程退出 */ }}public void cancel() { interrupt(); }
}

7.1.2 中断策略

正如任务中应该包含取消策略一样,线程同样应该包含中断策略。中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),哪些工作单元对于中断来说是原子操作,以及以多快的速度来响应中断。

最合理的中断策略是某种形式的线程级(-Level)取消操作或服务级(-Level)取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。此外还可以建立其他的中断策略,例如暂停服务或重新开始服务,但对于那些包含非标准中断策略的线程或线程池,只能用于能知道这些策略的任务中。

区分任务和线程对中断的反应是很重要的。一个中断请求可以有一个或多个接收者——中断线程池中的某个工作者线程,同时意味着“取消当前任务”和“关闭工作者线程”。

任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心地保存中断状态,这样拥有线程的代码才能对中断做出响应,即使“非所有者”代码也可以做出响应。(当你为一户人家打扫房屋时,即使主人不在,也不应该把在这段时间内收到的邮件扔掉,而应该把邮件收起来,等主人回来以后再交给他们处理,尽管你可以阅读他们的杂志。)

这就是为什么大多数可阻塞的库函数都只是抛出作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。

当检查到中断请求时,任务并不需要放弃所有的操作——它可以推迟处理中断请求,并直到某个更合适的时刻。因此需要记住中断请求,并在完成当前任务后抛出或者表示已收到中断请求。这项技术能够确保在更新过程中发生中断时,数据结构不会被破坏。

任务不应该对执行该任务的线程的中断策略做出任何假设,除非该任务被专门设计为在服务中运行,并且在这些服务中包含特定的中断策略。无论任务把中断视为取消,还是其他某个中断响应操作,都应该小心地保存执行线程的中断状态。如果除了将传递给调用者外还需要执行其他操作,那么应该在捕获之后恢复中断状态:

Thread.currentThread().interrupt();

正如任务代码不应该对其执行所在的线程的中断策略做出假设,执行取消操作的代码也不应该对线程的中断策略做出假设。线程应该只能由其所有者中断,所有者可以将线程的中断策略信息封装到某个合适的取消机制中,例如关闭()方法。

由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

批评者曾嘲笑Java的中断功能,因为它没有提供抢占式中断机制,而且还强迫开发人员必须处理。然而,通过推迟中断请求的处理,开发人员能制定更灵活的中断策略,从而使应用程序在响应性和健壮性之间实现合理的平衡。

7.1.3 响应中断

在5.4节中,当调用可中断的阻塞函数时,例如.sleep或.put等,有两种实用策略可用于处理:

传递异常(可能在执行某个特定于任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。

恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。

传递与将添加到子句中一样容易,如程序清单7-6中的所示。

程序清单7-6将传递给调用者

BlockingQueue<Task> queue;
...
public Task getNextTask() throws InterruptedException() {return queue.tak();
}

如果不想或无法传递(或许通过来定义任务),那么需要寻找另一种方式来保存中断请求。一种标准的方法就是通过再次调用来恢复中断状态。你不能屏蔽,例如在catch块中捕获到异常却不做任何处理,除非在你的代码中实现了线程的中断策略。虽然屏蔽了中断,但这是因为它已经知道线程将要结束,因此在调用栈中已经没有上层代码需要知道中断信息。由于大多数代码并不知道它们将在哪个线程中运行,因此应该保存中断状态。

只有实现了线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是在捕获时恢复状态,如程序清单7-7所示。如果过早地设置中断状态,就可能引起无限循环,因为大多数可中断的阻塞方法都会在入口处检查中断状态,并且当发现该状态已被设置时会立即抛出。(通常,可中断的方法会在阻塞或进行重要的工作前首先检查中断,从而尽快地响应中断)。

程序清单7-7不可取消的任务在退出前恢复中断

public Task getNextTask(BlockingQueue<Task> queue) {boolean interrupted = false;try {while (true) {try {return queue.take();} catch (InterruptedException ex) {interrupted = true;// 重新尝试}}} finally {if (interrupted) {Thread.currentThread().interrupt();}}
}

如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。要选择合适的轮询频率,就需要在效率和响应性之间进行权衡。如果响应性要求较高,那么不应该调用那些执行时间较长并且不响应中断的方法,从而对可调用的库代码进行一些限制。

在取消过程中可能涉及除了中断状态之外的其他状态。中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步的指示。(当访问这些信息时,要确保使用同步。)

例如,当一个由拥有的工作者线程检测到中断时,它会检查线程池是否正在关闭。如果是,它会在结束之前执行一些线程池清理工作,否则它可能创建一个新线程将线程池恢复到合理的规模。

7.1.4 示例 记时运行

许多问题永远也无法解决(例如,枚举所有的素数),而某些问题,能很快得到答案,也可能永远得不到答案。在这些情况下,如果能够指定“最多花10分钟搜索答案”或者“枚举出在10分钟内能找到的答案”,那么将是非常有用的。

程序清单7-2中的方法将启动一个,并在1秒钟后中断。尽管可能需要超过1秒的时间才能停止,但它最终会发现中断,然后停止,并使线程结束。在执行任务时的另一个方面是,你希望知道在任务执行过程中是否会抛出异常。如果在指定时限内抛出了一个未检查的异常,那么这个异常可能会被忽略,因为素数生成器在另一个独立的线程中运行,而这个线程并不会显式地处理异常。

在程序清单7-8中给出了在指定时间内运行一个任意的的示例。它在调用线程中运行任务,并安排了一个取消任务,在运行指定的时间间隔后中断它。这解决了从任务中抛出未检查异常的问题,因为该异常会被的调用者捕获。

程序清单7-8在外部线程中安排中断(不要这么做)

private static final ScheduledExecutorService cancelExec = ...;
public static void timeRun(Runnable r, long timeout, TimeUnit unit) {final Thread taskThread = Thread.currentThread();cancelExec.schedule(new Runnable() {@Overridepublic void run() {taskThread.interrupt();}}, timeout, unit);r.run();
}

这是一种非常简单的方法,但却破坏了以下规则:在中断线程之前,应该了解它的中断策略。由于可以从任意一个线程中调用,因此它无法知道这个调用线程的中断策略。如果任务在超时之前完成,那么中断所在线程的取消任务将在返回到调用者之后启动。我们不知道在这种情况下将运行什么代码,但结果一定是不好的。(可以使用返回的来取消这个取消任务以避免这种风险,这种做法虽然可行,但却非常复杂。)

而且,如果任务不响应中断,那么会在任务结束时才返回,此时可能已经超过了指定的时限(或者还没有超过时限)。如果某个限时运行的服务没有在指定的时间内返回,那么将对调用者带来负面影响。

在程序清单7-9中解决了的异常处理问题以及之前解决方案中的问题。执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍能返回到它的调用者。在启动任务线程之后,将执行一个限时的join方法。在join返回后,它将检查任务中是否有异常抛出,如果有的话,则会在调用的线程中再次抛出该异常。由于将在两个线程之间共享,因此该变量被声明为类型,从而确保安全地将其从任务线程发布到线程。

程序清单7-9在专门的线程中中断任务

public static void timeRun(final Runnable r, long timeout, TimeUnit unit) throws InterruptedException {class RethrowableTask implements Runnable {private volatile Throwable t;@Overridepublic void run() {try { r.run();} catch (Throwable t) { this.t = t;}}void rethrow() {if (t != null) { throw launderThrowable(); }}}RethrowableTask task = new RethrowableTask();final Thread taskThread = new Thread(task);taskThread.start();cancelExec.schedule(new Runnable() {@Overridepublic void run() { taskThread.interrupt(); }}, timeout, unit);taskThread.join(unit.toMillis(timeout));task.rethrow();
}

在这个示例的代码中解决了前面示例中的问题,但由于它依赖于一个限时的join,因此存在着join的不足:无法知道执行控制是因为线程正常退出而返回还是因为join超时而返回。【这是 API的一个缺陷,因为无论join是否成功地完成,在Java内存模型中都会有内存可见性结果,但join本身不会返回某个状态来表明它是否成功。】

7.1.5 通过来实现取消

我们已经使用了一种抽象机制来管理任务的生命周期,处理异常,以及实现取消,即。通常,使用现有库中的类比自行编写更好,因此我们将继续使用和任务执行框架来构建。

.将返回一个来描述任务。拥有一个方法,该方法带有一个类型的参数g,表示取消操作是否成功。(这只是表示任务是否能够接收中断,而不是表示任务是否能检测并处理中断。)如果g为true并且任务当前正在某个线程中运行,那么这个线程能被中断。如果这个参数为false,那么意味着“若任务还没有启动,就不要运行它”,这种方式应该用于那些不处理中断的任务中。

.将返回一个来描述任务。拥有一个方法,该方法带有一个类型的参数g,表示取消操作是否成功。(这只是表示任务是否能够接收中断,而不是表示任务是否能检测并处理中断。)如果g为true并且任务当前正在某个线程中运行,那么这个线程能被中断。如果这个参数为false,那么意味着“若任务还没有启动,就不要运行它”,这种方式应该用于那些不处理中断的任务中。

程序清单7-10给出了另一个版本的:将任务提交给一个,并通过一个定时的.get来获得结果。如果get在返回时抛出了一个,那么任务将通过它的来取消。(为了简化代码,这个版本的在块中将直接调用.,因为取消一个已完成的任务不会带来任何影响。)如果任务在被取消前就抛出一个异常,那么该异常将被重新抛出以便由调用者来处理异常。在程序清单7-10中还给出了另一种良好的编程习惯:取消那些不再需要结果的任务。(在程序清单6-13和程序清单6-16中使用了相同的技术。)

程序清单7-10通过来取消任务

public static void timeRun(Runnable r, long timeout, TimeUnit unit)throws InterruptedException {Future<?> task = taskExec.submit(r);try {task.get(timeout, unit);} catch (TimeoutException ex) {// 接下来任务将被取消} catch (ExecutionException ex) {// 如果在任务中抛出了异常,那么重新抛出该异常throw launderThrowable(ex.getCause());  } finally {// 如果任务已经结束,那么执行取消操作也不会带来任何影响task.cancel(true); //如果任务正在运行,那么将被中断  }
}

当.get抛出或时,如果你知道不再需要结果,那么就可以调用.来取消任务。

7.1.6 处理不可中断的阻塞

在Java库中,许多可阻塞的方法都是通过提前返回或者抛出来响应中断请求的,从而使开发人员更容易构建出能响应取消请求的任务。然而,并非所有的可阻塞方法或者阻塞机制都能响应中断;如果一个线程由于执行同步的 I/O或者等待获得内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求我们必须知道线程阻塞的原因。

**Java.io包中的同步 I/O。**在服务器应用程序中,最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然和中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个。

**Java.io包中的同步I/O。**当中断一个正在上等待的线程时,将抛出并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出)。当关闭一个时,将导致所有在链路操作上阻塞的线程都抛出。大多数标准的都实现了。

**的异步I/O。**如果一个线程在调用.方法(在java.nio.中)时阻塞了,那么调用close或方法会使线程抛出ion并提前返回。

**获取某个锁。**如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,因为线程认为它肯定会获得锁,所以将不会理会中断请求。但是,在Lock类中提供了方法,该方法允许在等待一个锁的同时仍能响应中断,请参见第13章。

程序清单7-11的给出了如何封装非标准的取消操作。管理了一个套接字连接,它采用同步方式从该套接字中读取数据,并将接收到的数据传递给。为了结束某个用户的连接或者关闭服务器,改写了方法,使其既能处理标准的中断,也能关闭底层的套接字。因此,无论线程是在read方法中阻塞还是在某个可中断的阻塞方法中阻塞,都可以被中断并停止执行当前的工作。

程序清单7-11通过改写方法将非标准的取消操作封装在中

public class ReaderThread extends Thread {private final Socket socket;private final InputStream in;public ReaderThread(Socket socket) throws IOException {this.socket = socket;this.in = socket.getInputStream();}public void interrupt() {try {socket.close();} catch (IOException ignored) {} finally {super.interrupt();}}@Overridepublic void run() {try {byte[] buf = new byte[BUFZE];while (true) {int count = in.read(buf);if (count < 0) {break;} else if (count > 0) {processBuffer(buf, count);}}} catch (IOException ex) { /* 允许线程退出 */ }}
}

7.1.7 采用来封装非标准的取消

我们可以通过方法来进一步优化中封装非标准取消的技术,这是Java6在中的新增功能。当把一个提交给时,方法会返回一个,我们可以通过这个来取消任务。是一个工厂方法,它将创建来代表任务。还能返回一个接口,该接口扩展了和(并由实现)。

通过定制表示任务的可以改变.的行为。例如,定制的取消代码可以实现日志记录或者收集取消操作的统计信息,以及取消一些不响应中断的操作。通过改写方法,可以取消基于套接字的线程。

在程序清单7-12的中定义了一个接口,该接口扩展了,并增加了一个方法和一个工厂方法来构造。扩展了,并通过改写使得可以创建自己的。

程序清单7-12通过将非标准的取消操作封装在一个任务中

public interface CancellableTask<T> extends Callable<T> {void cancel();RunnableFuture<T> newTask();
}
@ThreadSafe
public class CancellingExecutor extends ThreadPoolExecutor {protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable) {if (callable instanceof CancellableTask)return ((CancellableTask<T>) callable).newTask();else return super.newTaskFor(callable);}
}
public abstract class SocketUsingTask<T> implements CancellableTask<T> {@GuardedBy("this")private Socket socket;protected synchronized void setSocket(Socket s) { socket = s; }public synchronized void cancel() {try {if (socket != null) { socket.close(); }} catch (IOException ignord) { }}public RunnableFuture<T> newTask() {return new FutureTask<T>(this) {@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {try {SocketUsingTask.this.cancel();}  finally {return super.cancel(mayInterruptIfRunning);}     }};}
}

实现了,并定义了.来关闭套接字和调用super.。如果通过其自己的来取消,那么底层的套接字将被关闭并且线程将被中断。因此它提高了任务对取消操作的响应性:不仅能够在调用可中断方法的同时确保响应取消操作,而且还能调用可阻调的套接字I/O方法。

7.2 停止基于线程的服务

应用程序通常会创建拥有多个线程的服务,例如线程池,并且这些服务的生命周期通常比创建它们的方法的生命周期更长。如果应用程序准备退出,那么这些服务所拥有的线程也需要结束。由于无法通过抢占式的方法来停止线程,因此它们需要自行结束。

正确的封装原则是:除非拥有某个线程,否则不能对该线程进行操控。例如,中断线程或者修改线程的优先级等。在线程API中,并没有对线程所有权给出正式的定义:线程由对象表示,并且像其他对象一样可以被自由共享。然而,线程有一个相应的所有者,即创建该线程的类。因此线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池。

与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程。相反,服务应该提供生命周期方法( )来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。在中提供了和等方法。同样,在其他拥有线程的服务中也应该提供类似的关闭机制。

对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

7.2.1 示例 日志服务

在大多数服务器应用程序中都会用到日志,例如,在代码中插入语句就是一种简单的日志。像这样的字符流类是线程安全的,因此这种简单的方法不需要显式的同步【如果需要在单条日志消息中写入多行,那么要通过客户端加锁来避免多个线程不正确地交错输出。如果两个线程同时把多行栈追踪信息(Stack Trace)添加到同一个流中,并且每行信息对应一个调用,那么这些信息在输出中将交错在一起,看上去就是一些虽然庞大但却毫无意义的栈追踪信息。】。然而,在11.6节中,我们将看到这种内联日志功能会给一些高容量的()应用程序带来一定的性能开销。另外一种替代方法是通过调用log方法将日志消息放入某个队列中,并由其他线程来处理。

在程序清单7-13的中给出了一个简单的日志服务示例,其中日志操作在单独的日志线程中执行。产生日志消息的线程并不会将消息直接写入输出流,而是由通过将消息提交给日志线程,并由日志线程写入。这是一种多生产者单消费者(-, -)的设计方式:每个调用log的操作都相当于一个生产者,而后台的日志线程则相当于消费者。如果消费者的处理速度低于生产者的生成速度,那么将阻塞生产者,直到日志线程有能力处理新的日志消息。

程序清单7-13不支持关闭的生产者-消费者日志服务

class LogWriter {private final BlockingQueue<String> queue;private final LoggerThread logger;public LogWriter(Writer writer) {this.queue = new LinkedBlockingQueue<>(CAPACITY);this.logger = new LoggerThread(writer);}public void start() { logger.start(); }public void log(String msg) throws InterruptedException {queue.put(msg);}private class LoggerThread extends Thread {private final PrintWriter writer;...@Overridepublic void run() {try {while (true)writer.println(queue.take());} catch (InterruptedException ignored) {}finally {writer.close();}}}
}

为了使像这样的服务在软件产品中能发挥实际的作用,还需要实现一种终止日志线程的方法,从而避免使JVM无法正常关闭。要停止日志线程是很容易的,因为它会反复调用take,而take能响应中断。如果将日志线程修改为当捕获到时退出,那么只需中断日志线程就能停止服务。

然而,如果只是使日志线程退出,那么还不是一种完备的关闭机制。这种直接关闭的做法会丢失那些正在等待被写入到日志的信息,不仅如此,其他线程将在调用log时被阻塞,因为日志消息队列是满的,因此这些线程将无法解除阻塞状态。当取消一个生产者-消费者操作时,需要同时取消生产者和消费者。在中断日志线程时会处理消费者,但在这个示例中,由于生产者并不是专门的线程,因此要取消它们将非常困难。

另一种关闭的方法是:设置某个“已请求关闭”标志,以避免进一步提交日志信息,如程序清单7-14所示。在收到关闭请求后,消费者会把队列中的所有消息写入日志,并解除所有在调用log时阻塞的生产者。然而,在这个方法中存在着竞态条件问题,使得该方法并不可靠。log的实现是一种“先判读再运行”的代码序列:生产者发现该服务还没有关闭,因此在关闭服务后仍然会将日志消息放入队列,这同样会使得生产者可能在调用log时阻塞并且无法解除阻塞状态。可以通过一些技巧来降低这种情况的发生概率(例如,在宣布队列被清空之前,让消费者等待数秒),但这些都没有解决问题的本质,即使很小的概率也可能导致程序发生故障。

程序清单7-14 通过一种不可靠的方式为日志服务增加关闭支持

public void log(String msg) throws InterruptedException {if (!shutdownRequested) {queue.put(msg);} else {throw new IllegalStateException("Logger is shut down");}
}

为提供可靠关闭操作的方法是解决竞态条件问题,因而要使日志消息的提交操作成为原子操作。然而,我们不希望在消息加入队列时去持有一个锁,因为put方法本身就可以阻塞。我们采用方法是:通过原子方式来检查关闭请求,并且有条件的递增一个计数器来“保持”提交消息的权利,如程序清单7-15的所示。

程序清单7-15 向添加可靠的取消操作

public class LogService {private final BlockingQueue<String> queue;private final LoggerThread loggerThread;private final PrintWriter writer;@GuardedBy("this") private boolean isShutdown;@GuardedBy("this") private int reservations;public void start() { loggerThread.start(); }public void stop() { synchronized (this) { isShutdown = true; } loggerThread.interrupt();  }public void log(String msg) throws InterruptedException {synchronized (this) {if (isShutdown) {throw new IllegalStateException("...");  }++reservations;}queue.put(msg);}private class LoggerThread extends Thread {@Override  public void run() {try {while (true) {try {synchronized (LogService.this) {if (isShutdown && reservations == 0) {break;  }String msg = queue.take();synchronized (LogService.this) { -- reservations; }writer.println(msg);}} catch (InterruptedException ex) { /* retry */ }}} finally {writer.close();}  }  }
}

7.2.2 关闭

在6.2.4节中,我们看到提供了两种关闭方法:使用正常关闭,以及使用强行关闭。在进行强行关闭时,首先关闭当前正在执行的任务,然后返回所有尚未启动的任务清单。

这两种关闭方式的差别在于各自的安全性和响应性:强行关闭的速度更快,但风险也更大,因为任务很可能在执行到一半时被结束;而正常关闭虽然速度慢,但却更安全,因为会一直等到队列中的所有任务都执行完成后才关闭。在其他拥有线程的服务中也应该考虑提供类似的关闭方式以供选择。

简单的程序可以直接在main函数中启动和关闭全局的。而在复杂程序中,通常会将封装在某个更高级别的服务中,并且该服务能提供其自己的生命周期方法,例如程序清单7-16中的一种变化形式,它将管理线程的工作委托给一个,而不是由其自行管理。通过封装,可以将所有权链( Chain)从应用程序扩展到服务以及线程,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。

程序清单7-16使用的日志服务

class LogService {private final ExecutorService exec = newSingleThreadExecutor();public void start() { }public void stop() throws InterruptedException {try {exec.shutdown();exec.awaitTermination(TIMEOUT, UNIT);} finally {writer.close();}}public void log(String msg) {try {exec.execute(new WriteTask(msg));  } catch (RejectedExecutionException ignored) { }}
}

7.2.3 "毒丸“对象

另一种关闭生产者-消费者服务的方式就是使用“毒丸( Pill)”对象:“毒丸”是指一个放在队列上的对象,其含义是:”当得到这个对象时,立即停止。“ 在FIFO(先进先出)队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作。在程序清单7-17、程序清单7-18和程序清单7-19中给出一个单生产者-单消费者的桌面搜索示例(来自程序清单5-8),在这个示例中使用了“毒丸”对象来关闭服务。

程序清单7-17通过“毒丸”对象来关闭服务

public class IndexingService {private static final File POISON = new File("");private final IndexerThread consumer = new IndexerThread();private final CrawlerThead producter = new CrawlerThead();private final BlockingQueue<File> queue;private final FileFilter fileFilter;private final File root;class CrawlerThead extends Thread { }class IndexerThread extends Thread { }public void start() { producter.start(); consumer.start(); }public void stop() { producter.interrupt(); }  public void awwitTermination() throws InterruptedException { consumer.join(); }  
}

程序清单7-的生产者线程

public class CrawlerThead extends Thread {@Overridepublic void run() {try {crawl(root);} catch (InterruptedException ex) { }finally {while (true) {try {queue.put(POISON);  break;} catch (InterruptedException ex) { }  }}  }private void crawl(File root) throws InterruptedException {  }}

程序清单7-的消费者线程

public class IndexerThread extends Thread {@Overridepublic void run() {try {while (true) {File file = queue.take();if (file == POISON) {break;  } else {indexFile(file);  }}} catch (InterruptedException ex) { }  }private void indexFile(File root) throws InterruptedException {  }}

只有在生产者和消费者的数量都已知的情况下,才可以使用“毒丸”对象。在中采用的解决方案可以扩展到多个生产者:只需每个生产者都向队列中放入一个“毒丸”对象,并且消费者仅当在接收到个“毒丸”对象时才停止。这种方法也可以扩展到多个消费者的情况,只需生产者将个“毒丸”对象放入队列。然而,当生产者和消费者的数量较大时,这种方法将变得难以使用。只有在无界队列中,“毒丸”对象才能可靠地工作。

7.2.4 示例 只执行一次的服务

如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的来简化服务的生命周期管理,其中该的生命周期是由这个方法来控制的。(在这种情况下,和等方法通常会起较大的作用。)

程序清单7-20中的方法能在多台主机上并行地检查新邮件。它创建一个私有的,并向每台主机提交一个任务。然后,当所有邮件检查任务都执行完成后,关闭并等待结束。【之所以采用来代替类型的,是因为能从内部的中访问标志,因此它必须是final类型以免被修改。】

程序清单7-20使用私有的,并且该的生命周期受限于方法调用

boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException {ExecutorService exec = Executors.newCachedThreadPool();final AtomicBoolean hasNewMail = new AtomicBoolean(false);try {for (final String host : hosts) {exec.execute(new Runnable() {@Overridepublic void run() {if (checkMail(host)) {hasNewMail.set(true);  }}});  }} finally {exec.shutdown();exec.awaitTermination(timeout, unit);}return hasNewMail.get();}

7.2.5 的局限性

当通过来强行关闭时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。【返回的对象可能与提交给的对象并不相同:它们可能是被封装过的已提交任务。】

然而,我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,而且还需要知道当关闭时哪些任务正在执行。【然而,在关闭过程中只会返回尚未开始的任务,而不会返回正在执行的任务。如果能返回所有这两种类型的任务,那么就不需要这种不确定的中间状态。】

程序清单7-21在中跟踪在关闭之后被取消的任务

public class TrackingExecutor extends AbstractExecutorService {private final ExecutorService exec;private final Set<Runnable> tasksCancelledAtShundown=Collections.synchronizedSet(new HashSet<>());public List<Runnable> getCancelledTasks() {if (!exec.isTerminated()) {throw new IllegalStateException("");}return new ArrayList<Runnable>(tasksCancelledAtShundown);}public void execute(final Runnable runnable) {exec.execute(new Runnable() {@Overridepublic void run() {try {runnable.run();} finally {if (isShutdown() && Thread.currentThread().isInterrupted()) {tasksCancelledAtShundown(runnable);}}}});}// 将ExecutorService的其他方法委托给exec
}

在程序清单7-22的中给出了的用法。网页爬虫程序的工作通常是无穷尽的,因此当爬虫程序必须关闭时,我们通常希望保存它的状态,以便稍后重新启动。提供了一个方法,该方法能找出正在处理的页面。当爬虫程序关闭时,无论是还没有开始的任务,还是那些被取消的任务,都将记录它们的URL,因此当爬虫程序重新启动时,就可以将这些URL的页面抓取任务加入到任务队列中。

程序清单7-22使用ice来保存未完成的任务以备后续执行

abstract class WebCrawler {private volatile TrackingExecutor exec;@GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<>();public synchronized void start() { exec = new TrackingExecutor(Executors.newCachedThreadPool());  for (URL url : urlsToCrawl) {submitCrawlTask(url);  }urlsToCrawl.clear();}public synchronized void stop() throws InterruptedException {try {saveUncrawler(exec.shutdownNow());  if (exec.awaitTermination(TIMEOUT, UNIT)) {saveUncrawled(exec.getCancelledTasks());  }} finally {exec = null;  }}protected abstract List<URL> processPage(URL url);public void saveUncrawled(List<Runnable> uncrawled) {for (Runnable task : uncrawled) {urlsToCrawl.add(((CrawlTask) task).getPage());  }}private void submitCrawlTask(URL u) {exec.execute(new CrawlTask(u));}private class CrawlTask implements Runnable {private final URL url;public void run() {for (URL link : processPage(url)) {if (Thread.currentThread().isInterrupted()) {return;  }submitCrawlTask(link);}}public URL getPage() { return url; }}
}

在中存在一个不可避免的竞态条件,从而产生“误报”问题:一些被认为已取消的任务实际上已经执行完成。这个问题的原因在于,在任务执行最后一条指令以及线程池将任务记录为“结束”的两个时刻之间,线程池可能被关闭。如果任务是幂等的(,即将任务执行两次与执行一次会得到相同的结果),那么这不会存在问题,在网页爬虫程序中就是这种情况。否则,在应用程序中必须考虑这种风险,并对“误报”问题做好准备。

7.3 处理非正常的线程终止

当单线程的控制台程序由于发生了一个未捕获的异常而终止时,程序将停止运行,并产生与程序正常输出非常不同的栈追踪信息,这种情况是很容易理解的。然而,如果并发程序中的某个线程发生故障,那么通常并不会如此明显。在控制台中可能会输出栈追踪信息,但没有人会观察控制台。此外,当线程发生故障时,应用程序可能看起来仍然在工作,所以这个失败很可能会被忽略。幸运的是,我们有可以监测并防止在程序中“遗漏”线程的方法。

导致线程提前死亡的最主要原因就是。由于这些异常表示出现了某种编程错误或者其他不可修复的错误,因此它们通常不会被捕获。它们不会在调用栈中逐层传递,而是默认地在控制台中输出栈追踪信息,并终止线程。

线程非正常退出的后果可能是良性的,也可能是恶性的,这要取决于线程在应用程序中的作用。虽然在线程池中丢失一个线程可能会对性能带来一定影响,但如果程序能在包含50个线程的线程池上运行良好,那么在包含49个线程的线程池上通常也能运行良好。然而,如果在GUI程序中丢失了事件分派线程,那么造成的影响将非常显著——应用程序将停止处理事件并且GUI会因此失去响应。在第6章的中给出了由于遗漏线程而造成的严重后果:Timer表示的服务将永远无法使用。

任何代码都可能抛出一个。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目地认为它一定会正常返回,或者一定会抛出在方法原型中声明的某个已检查异常。对调用的代码越不熟悉,就越应该对其代码行为保持怀疑。

在任务处理线程(例如线程池中的工作者线程或者Swing的事件派发线程等)的生命周期中,将通过某种抽象机制(例如)来调用许多未知的代码,我们应该对在这些线程中执行的代码能否表现出正确的行为保持怀疑。像Swing事件线程这样的服务可能只是因为某个编写不当的事件处理器抛出而失败,这种情况是非常糟糕的。因此,这些线程应该在try-catch代码块中调用这些任务,这样就能捕获那些未检查的异常了,或者也可以使用try-代码块来确保框架能够知道线程非正常退出的情况,并做出正确的响应。在这种情况下,你或许会考虑捕获,即当通过这样的抽象机制来调用未知的和不可信的代码时。【这项技术的安全性存在着一些争议。当线程抛出一个未检查遗产时,整个应用程序都可能收到影响。但其替代方法----- 关闭整个应用程序,通常是更不切实际的】

在程序清单7-23中给出了如何在线程池内部构建一个工作者线程。如果任务抛出了一个未检查异常,那么它将使线程终结,但会首先通知框架该线程已经终结。然后,框架可能会用新的线程来代替这个工作线程,也可能不会,因为线程池正在关闭,或者当前已有足够多的线程能满足需要。和Swing都通过这项技术来确保行为糟糕的任务不会影响到后续任务的执行。当编写一个向线程池提交任务的工作者线程类时,或者调用不可信的外部代码时(例如动态加载的插件),使用这些方法中的某一种可以避免某个编写得糟糕的任务或插件不会影响调用它的整个线程。

程序清单7-23典型的线程池工作者线程结构

public void run() {Throwable thrown = null;try {while (!isInterrupted) {runTask(getTaskFromWorkQueue());}} catch () {thrown = e;} finally {threadExited(this, thrown);}
}

未捕获异常的处理

上节介绍了一种主动方法来解决未检查异常。在 API中同样提供了-,它能检测出某个线程由于未捕获的异常而终结的情况。这两种方法是互补的,通过将二者结合在一起,就能有效地防止线程泄漏问题。

当一个线程由于未捕获异常而退出时,JVM会把这个事件报告给应用程序提供的dler异常处理器(见程序清单7-24)。如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到.err。【在Java 5.0之前,控制dler的唯一方法就是对进行子类化。在Java 5.0及之后的版本中,可以通过.为每个线程设置一个-,还可以使用来设置默认的-。然而,在这些异常处理器中,只有其中一个将被调用——JVM首先搜索每个线程的异常处理器,然后再搜索一个的异常处理器。中的默认异常处理器实现将异常处理工作逐层委托给它的上层,直至其中某个的异常处理器能够处理该未捕获异常,否则将一直传递到顶层的。顶层的异常处理器委托给默认的系统处理器(如果存在,在默认情况下为空),否则将把栈追踪信息输出到控制台。】

程序清单7-接口

public interface UncaughtExceptionHandler {void uncaughtException(Thread t, Throwable e);
}

异常处理器如何处理未捕获异常,取决于对服务质量的需求。最常见的响应方式是将一个错误信息以及相应的栈追踪信息写入应用程序日志中,如程序清单7-25所示。异常处理器还可以采取更直接的响应,例如尝试重新启动线程,关闭应用程序,或者执行其他修复或诊断等操作。

程序清单7-25将异常写入日志的dler

public class UEHLogger implements Thread.UncaughtExceptionHandler {public void uncaughtException(Thread t, Throwable e) [Logger logger = Logger.getAnonymousLogger();logger.log(Level.SEVERE, "Thread terminated with exception:"+t.getName(), e);}
}

在运行时间较长的应用程序中,通常会为所有线程的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。

要为线程池中的所有线程设置一个dler,需要为-的构造函数提供一个。(与所有的线程操控一样,只有线程的所有者能够改变线程的dler。)标准线程池允许当发生未捕获异常时结束线程,但由于使用了一个try-代码块来接收通知,因此当线程结束时,将有新的线程来代替它。如果没有提供捕获异常处理器或者其他的故障通知机制,那么任务会悄悄失败,从而导致极大的混乱。如果你希望在任务由于发生异常而失败时获得通知,并且执行一些特定于任务的恢复操作,那么可以将任务封装在能捕获异常的或中,或者改写的方法。

令人困惑的是,只有通过提交的任务,才能将它抛出的异常交给未捕获异常处理器,而通过提交的任务,无论是抛出的未检查异常还是已检查异常,都将被认为是任务返回状态的一部分。如果一个由提交的任务由于抛出了异常而结束,那么这个异常将被.get封装在中重新抛出。

7.4 JVM关闭

JVM既可以正常关闭,也可以强行关闭。正常关闭的触发方式有多种,包括:当最后一个“正常(非守护)”线程结束时,或者当调用了.exit时,或者通过其他特定于平台的方法关闭时(例如发送了信号或键入Ctrl-C)。虽然可以通过这些标准方法来正常关闭JVM,但也可以通过调用.halt或者在操作系统中“杀死”JVM进程(例如发送)来强行关闭JVM。

7.4.1 关闭钩子

在正常关闭中,JVM首先调用所有已注册的关闭钩子( Hook)。关闭钩子是指通过.注册的但尚未开始的线程。JVM并不能保证关闭钩子的调用顺序。在关闭应用程序线程时,如果有(守护或非守护)线程仍然在运行,那么这些线程接下来将与关闭进程并发执行。当所有的关闭钩子都执行结束时,如果为true,那么JVM将运行终结器,然后再停止。JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。如果关闭钩子或终结器没有执行完成,那么正常关闭进程“挂起”并且JVM必须被强行关闭。当被强行关闭时,只是关闭JVM,而不会运行关闭钩子。

关闭钩子应该是线程安全的:它们在访问共享数据时必须使用同步机制,并且小心地避免发生死锁,这与其他并发代码的要求相同。而且,关闭钩子不应该对应用程序的状态(例如,其他服务是否已经关闭,或者所有的正常线程是否已经执行完成)或者JVM的关闭原因做出任何假设,因此在编写关闭钩子的代码时必须考虑周全。最后,关闭钩子必须尽快退出,因为它们会延迟JVM的结束时间,而用户可能希望JVM能尽快终止。

关闭钩子可以用于实现服务或应用程序的清理工作,例如删除临时文件,或者清除无法由操作系统自动清除的资源。在程序清单7-26中给出了如何使程序清单7-16中的在其start方法中注册一个关闭钩子,从而确保在退出时关闭日志文件。

由于关闭钩子将并发执行,因此在关闭日志文件时可能导致其他需要日志服务的关闭钩子产生问题。为了避免这种情况,关闭钩子不应该依赖那些可能被应用程序或其他关闭钩子关闭的服务。实现这种功能的一种方式是对所有服务使用同一个关闭钩子(而不是每个服务使用一个不同的关闭钩子),并且在该关闭钩子中执行一系列的关闭操作。这确保了关闭操作在单个线程中串行执行,从而避免了在关闭操作之间出现竞态条件或死锁等问题。无论是否使用关闭钩子,都可以使用这项技术,通过将各个关闭操作串行执行而不是并行执行,可以消除许多潜在的故障。当应用程序需要维护多个服务之间的显式依赖信息时,这项技术可以确保关闭操作按照正确的顺序执行。

程序清单7-26通过注册一个关闭钩子来停止日志服务

public void start() {Runtime.getRuntime().addShutdownHook(new Thread() {@Override public void run() {try { LogService.this.stop(); }catch (InterruptedException ignored) { }}});
}

7.4.2 守护线程

有时候,你希望创建一个线程来执行一些辅助工作,但又不希望这个线程阻碍JVM的关闭。在这种情况下就需要使用守护线程( )。

线程可分为两种:普通线程和守护线程。在JVM启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程(例如垃圾回收器以及其他执行辅助工作的线程)。当创建一个新线程时,新线程将继承创建它的线程的守护状态,因此在默认情况下,主线程创建的所有线程都是普通线程。

普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃——既不会执行代码块,也不会执行回卷栈,而JVM只是直接退出。

我们应尽可能少地使用守护线程——很少有操作能够在不进行清理的情况下被安全地抛弃。特别是,如果在守护线程中执行可能包含I/O操作的任务,那么将是一种危险的行为。守护线程最好用于执行“内部”任务,例如周期性地从内存的缓存中移除逾期的数据。

此外,守护线程通常不能用来替代应用程序管理程序中各个服务的生命周期。

7.4.3 终结器

当不再需要内存资源时,可以通过垃圾回收器来回收它们,但对于其他一些资源,例如文件句柄或套接字句柄,当不再需要它们时,必须显式地交还给操作系统。为了实现这个功能,垃圾回收器对那些定义了方法的对象会进行特殊处理:在回收器释放它们后,调用它们的方法,从而保证一些持久化的资源被释放。

由于终结器可以在某个由JVM管理的线程中运行,因此终结器访问的任何状态都可能被多个线程访问,这样就必须对其访问操作进行同步。终结器并不能保证它们将在何时运行甚至是否会运行,并且复杂的终结器通常还会在对象上产生巨大的性能开销。要编写正确的终结器是非常困难的。【请参阅(Boehm,2005)并了解在编写终结器时存在的各种困难。】在大多数情况下,通过使用代码块和显式的close方法,能够比使用终结器更好地管理资源。唯一的例外情况在于:当需要管理对象,并且该对象持有的资源是通过本地方法获得的。基于这些原因以及其他一些原因,我们要尽量避免编写或使用包含终结器的类(除非是平台库中的类)[EJ Item 6]。

避免使用终结器。

小结

在任务、线程、服务以及应用程序等模块中的生命周期结束问题,可能会增加它们在设计和实现时的复杂性。Java并没有提供某种抢占式的机制来取消操作或者终结线程。相反,它提供了一种协作式的中断机制来实现取消操作,但这要依赖于如何构建取消操作的协议,以及能否始终遵循这些协议。通过使用和框架,可以帮助我们构建可取消的任务和服务。

第8章 线程池的使用

第6章介绍了任务执行框架,它不仅能简化任务与线程的生命周期管理,而且还提供一种简单灵活的方式将任务的提交与任务的执行策略解耦开来。第7章介绍了在实际应用程序中使用任务执行框架时出现的一些与服务生命周期相关的细节问题。本章将介绍对线程池进行配置与调优的一些高级选项,并分析在使用任务执行框架时需要注意的各种危险,以及一些使用的高级示例。

8.1 在任务与执行策略之间的隐形耦合

我们已经知道,框架可以将任务的提交与任务的执行策略解耦开来。就像许多对复杂过程的解耦操作那样,这种论断多少有些言过其实了。虽然框架为制定和修改执行策略都提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略。有些类型的任务需要明确地指定执行策略,包括:

依赖性任务。 大多数行为正确的任务都是独立的:它们不依赖于其他任务的执行时序、执行结果或其他效果。当在线程池中执行独立的任务时,可以随意地改变线程池的大小和配置,这些修改只会对执行性能产生影响。然而,如果提交给线程池的任务需要依赖其他的任务,那么就隐含地给执行策略带来了约束,此时必须小心地维持这些执行策略以避免产生活跃性问题(请参见8.1.1节)。

使用线程封闭机制的任务。 与线程池相比,单线程的能够对并发性做出更强的承诺。它们能确保任务不会并发地执行,使你能够放宽代码对线程安全的要求。对象可以封闭在任务线程中,使得在该线程中执行的任务在访问该对象时不需要同步,即使这些资源不是线程安全的也没有问题。这种情形将在任务与执行策略之间形成隐式的耦合——任务要求其执行所在的是单线程的。如果将从单线程环境改为线程池环境,那么将会失去线程安全性。

对响应时间敏感的任务。 GUI应用程序对于响应时间是敏感的:如果用户在点击按钮后需要很长延迟才能得到可见的反馈,那么他们会感到不满。如果将一个运行时间较长的任务提交到单线程的中,或者将多个运行时间较长的任务提交到一个只包含少量线程的线程池中,那么将降低由该管理的服务的响应性。

使用的任务。 使每个线程都可以拥有某个变量的一个私有“版本”。然而,只要条件允许,可以自由地重用这些线程。在标准的实现中,当执行需求较低时将回收空闲线程,而当需求增加时将添加新的线程,并且如果从任务中抛出了一个未检查异常,那么将用一个新的工作者线程来替代抛出异常的线程。只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用才有意义,而在线程池的线程中不应该使用在任务之间传递值。

只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“拥塞”。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。幸运的是,在基于网络的典型服务器应用程序中——网页服务器、邮件服务器以及文件服务器等,它们的请求通常都是同类型的并且相互独立的。

在一些任务中,需要拥有或排除某种特定的执行策略。如果某些任务依赖于其他的任务,那么会要求线程池足够大,从而确保它们依赖任务不会被放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。通过将这些需求写入文档,将来的代码维护人员就不会由于使用了某种不合适的执行策略而破坏安全性或活跃性。

8.1.1 线程饥饿死锁

在线程池中,如果任务依赖于其他任务,那么可能产生死锁。在单线程的中,如果一个任务将另一个任务提交到同一个,并且等待这个被提交任务的结果,那么通常会引发死锁。第二个任务停留在工作队列中,并等待第一个任务完成,而第一个任务又无法完成,因为它在等待第二个任务的完成。在更大的线程池中,如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,那么会发生同样的问题。这种现象被称为线程饥饿死锁( ),只要线程池中的任务需要无限期地等待一些必须由池中其他任务才能提供的资源或条件,例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。

在程序清单8-1的中给出了线程饥饿死锁的示例。向提交了两个任务来获取网页的页眉和页脚,绘制页面,等待获取页眉和页脚任务的结果,然后将页眉、页面主体和页脚组合起来并形成最终的页面。如果使用单线程的,那么会经常发生死锁。同样,如果线程池不够大,那么当多个任务通过栅栏()机制来彼此协调时,将导致线程饥饿死锁。

程序清单8-1在单线程中任务发生死锁(不要这么做)

class ThreadDeadlock {ExecutorService exec = Executors.newSingleThreadExecutor();class RenderPageTask implements Callable<String> {@Overridepublic String call() throws Exception {Future<String> header, footer;header = exec.submit(new LoadFileTask("header.html"));footer = exec.submit(new LoadFileTask("footer.html"));String page = renderBody();// 将发生死锁,由于任务在等待子任务的结果return header.get() + page + footer.get();  }}
}

每当提交了一个有依赖性的任务时,要清楚地知道可能会出现线程“饥饿”死锁,因此需要在代码或配置的配置文件中记录线程池的大小限制或配置限制。

除了在线程池大小上的显式限制外,还可能由于其他资源上的约束而存在一些隐式限制。如果应用程序使用一个包含10个连接的JDBC连接池,并且每个任务需要一个数据库连接,那么线程池就好像只有10个线程,因为当超过10个任务时,新的任务需要等待其他任务释放连接。

8.1.2 运行时间较长的任务

如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。执行时间较长的任务不仅会造成线程池堵塞,甚至还会增加执行时间较短任务的服务时间。如果线程池中线程的数量远小于在稳定状态下执行时间较长任务的数量,那么到最后可能所有的线程都会运行这些执行时间较长的任务,从而影响整体的响应性。

有一项技术可以缓解执行时间较长任务造成的影响,即限定任务等待资源的时间,而不要无限制地等待。在平台类库的大多数可阻塞方法中,都同时定义了限时版本和无限时版本,例如.join、.put、.await以及.等。如果等待超时,那么可以把任务标识为失败,然后中止任务或者将任务重新放回队列以便随后执行。这样,无论任务的最终结果是否成功,这种办法都能确保任务总能继续执行下去,并将线程释放出来以执行一些能更快完成的任务。如果在线程池中总是充满了被阻塞的任务,那么也可能表明线程池的规模过小。

8.2 设置线程池的大小

线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中通常不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据.来动态计算。

幸运的是,要设置线程池的大小也并不困难,只需要避免“过大”和“过小”这两种极端情况。如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空闲的处理器无法执行工作,从而降低吞吐率。

要想正确地设置线程池的大小,必须分析计算环境、资源预算和任务的特性。在部署的系统中有多少个CPU?多大的内存?任务是计算密集型、I/O密集型还是二者皆可?它们是否需要像JDBC连接这样的稀缺资源?如果需要执行不同类别的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。

对于计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为Ncpu+1时,通常能实现最优的利用率。(即使当计算密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保CPU的时钟周期不会被浪费。)对于包含I/O操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。要正确地设置线程池的大小,你必须估算出任务的等待时间与计算时间的比值。这种估算不需要很精确,并且可以通过一些分析或监控工具来获得。你还可以通过另一种方法来调节线程池的大小:在某个基准负载下,分别设置不同大小的线程池来运行应用程序,并观察CPU利用率的水平。

给定下列定义:

要使处理器达到期望的使用率,线程池的最优大小等于:

可以通过来获得CPU的数目:

int N_CPUS = Runtime.getRuntime().availableProcessors();

当然,CPU周期并不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等。计算这些资源对线程池的约束条件是更容易的:计算每个任务对该资源的需求量,然后用该资源的可用总量除以每个任务的需求量,所得结果就是线程池大小的上限。

当任务需要某种通过资源池来管理的资源时,例如数据库连接,那么线程池和资源池的大小将会相互影响。如果每个任务都需要一个数据库连接,那么连接池的大小就限制了线程池的大小。同样,当线程池中的任务是数据库连接的唯一使用者时,那么线程池的大小又将限制连接池的大小。

8.3 配置

为一些提供了基本的实现,这些是由中的、和等工厂方法返回的。是一个灵活的、稳定的线程池,允许进行各种定制。

如果默认的执行策略不能满足需求,那么可以通过的构造函数来实例化一个对象,并根据自己的需求来定制,并且可以参考的源代码来了解默认配置下的执行策略,然后再以这些执行策略为基础进行修改。定义了很多构造函数,在程序清单8-2中给出了最常见的形式。

程序清单8-的通用构造函数

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { // ... }

8.3.1 线程的创建与销毁

线程池的基本大小(Core Pool Size)、最大大小( Pool Size)以及存活时间等因素共同负责线程的创建与销毁。基本大小也就是线程池的目标大小,即在没有任务执行时【在创建初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用ds。】线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程【开发人员以免有时会将线程池的基本大小设置为零,从而最终销毁工作者线程以免阻碍JVM的退出。然而,如果在线程池中没有使用作为其工作队列(例如在中就是如此),那么这种方式将产生一些奇怪的行为。如果线程池中的线程数量等于线程池的基本大小,那么仅当在工作队列已满的情况下才会创建新的线程。因此,如果线程池的基本大小为零并且其工作队列有一定的容量,那么当把任务提交给该线程池时,只有当线程池的工作队列被填满后,才会开始执行任务,而这种行为通常并不是我们所希望的。在Java 6中,可以通过ut来使线程池中的所有线程超时。对于一个大小有限的线程池并且在该线程池中包含一个工作队列,如果希望这个线程池在没有任务的情况下能销毁所有线程,那么可以启用这个特性并将基本大小设置为零。】。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。

通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他工作。(显然,这是一种折衷:回收空闲线程会产生额外的延迟,因为当需求增加时,必须创建新的线程来满足需求。)

工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。工厂方法将线程池的最大大小设置为.,而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。其他形式的线程池可以通过显式的构造函数来构造。

8.3.2 管理队列任务

在有限的线程池中会限制可并发执行的任务数量。(单线程的是一种值得注意的特例:它们能确保不会有任务并发执行,因为它们通过线程封闭来实现线程安全性。)

在6.1.2节中曾介绍,如果无限制地创建线程,那么将导致不稳定性,并通过采用固定大小的线程池(而不是每收到一个请求就创建一个新线程)来解决这个问题。然而,这个方案并不完整。在高负载情况下,应用程序仍可能耗尽资源,只是出现问题的概率较小。如果新请求的到达速率超过了线程池的处理速率,那么新到来的请求将累积起来。在线程池中,这些请求会在一个由管理的队列中等待,而不会像线程那样去竞争CPU资源。通过一个和一个链表节点来表现一个等待中的任务,当然比使用线程来表示的开销低很多,但如果客户提交给服务器请求的速率超过了服务器的处理速率,那么仍可能会耗尽资源。

即使请求的平均到达速率很稳定,也仍然会出现请求突增的情况。尽管队列有助于缓解任务的突增问题,但如果任务持续高速地到来,那么最终还是会抑制请求的到达率以避免耗尽内存。【这类似于通信网络中的流量控制:可以缓存一定数量的数据,但最终需要通过某种方式来告诉发送端停止发送数据,或者丢弃过多的数据并希望发送端在空闲时重传被丢弃的数据。】甚至在耗尽内存之前,响应性能也将随着任务队列的增长而变得越来越糟。

允许提供一个来保存等待执行的任务。基本的任务排队方法有3种:无界队列、有界队列和同步移交( )。队列的选择与其他的配置参数有关,例如线程池的大小等。

和tor在默认情况下将使用一个无界的。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。

一种更稳妥的资源管理策略是使用有界队列,例如、有界的、e。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(有许多饱和策略[ ]可以解决这个问题。请参见8.3.3节。)在使用有界的工作队列时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但付出的代价是可能会限制吞吐量。

对于非常大的或者无界的线程池,可以通过使用来避免任务排队,以及直接将任务从生产者移交给工作者线程。不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作者线程从队列中提取该任务。只有当线程池是无界的或者可以拒绝任务时,才有实际价值。在工厂方法中就使用了。

当使用像或这样的FIFO(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。如果想进一步控制任务执行顺序,还可以使用e,这个队列将根据优先级来安排任务。任务的优先级是通过自然顺序或(如果任务实现了)来定义的。

对于, 工厂方法是一种很好的默认选择,它能提供比固定大小的线程池更好的排队性能【这种性能差异是由于使用了而不是。在Java 6中提供了一个新的非阻塞算法来替代,与Java 5.0中的相比,该算法把基准的吞吐量提高了3倍( et al.,2006)。】。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池,就像在接受网络客户请求的服务器应用程序中,如果不进行限制,那么很容易发生过载问题。

只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题。此时应该使用无界的线程池,例如【对于提交其他任务并等待其结果的任务来说,还有另一种配置方法,就是使用有界的线程池,并使用作为工作队列,以及“调用者运行(-Runs)”饱和策略。】。

8.3.3 饱和策略

当有界队列被填满后,饱和策略开始发挥作用。的饱和策略可以通过调用来修改。(如果某个任务被提交到一个已被关闭的时,也会用到饱和策略。)JDK提供了几种不同的dler实现,每种实现都包含有不同的饱和策略:、、和。

“中止(Abort)”策略是默认的饱和策略,该策略将抛出未检查的-。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。当新提交的任务无法保存到队列中等待执行时,“抛弃()”策略会悄悄抛弃该任务。“抛弃最旧的(-)”策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。(如果工作队列是一个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最高的任务,因此最好不要将“抛弃最旧的”饱和策略和优先级队列放在一起使用。)

“调用者运行(-Runs)”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了的线程中执行该任务。我们可以将示例修改为使用有界队列和“调用者运行”饱和策略,当线程池中的所有线程都被占用,并且工作队列被填满后,下一个任务会在调用时在主线程中执行。由于执行任务需要一定的时间,因此主线程至少在一段时间内不能提交任何任务,从而使得工作者线程有时间来处理完正在执行的任务。在这期间,主线程不会调用,因此到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中。如果持续过载,那么TCP层将最终发现它的请求队列被填满,因此同样会开始抛弃请求。当服务器过载时,这种过载情况会逐渐向外蔓延开来——从线程池到工作队列到应用程序再到TCP层,最终达到客户端,导致服务器在高负载下实现一种平缓的性能降低。

当创建时,可以选择饱和策略或者对执行策略进行修改。程序清单8-3给出了如何创建一个固定大小的线程池,同时使用“调用者运行”饱和策略。

序清单8-3创建一个固定大小的线程池,并采用有界队列以及“调用者运行”饱和策略

ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, OL,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunPolicy());  

当工作队列被填满后,没有预定义的饱和策略来阻塞。然而,通过使用(信号量)来限制任务的到达率,就可以实现这个功能。在程序清单8-4的中给出了这种方法。该方法使用了一个无界队列(因为不能限制队列的大小和任务的到达率),并设置信号量的上界设置为线程池的大小加上可排队任务的数量,这是因为信号量需要控制正在执行的和等待执行的任务数量。

程序清单8-4使用来控制任务的提交速率

@ThreadSafe
public class BoundedExecutor {private final Executor exec;private final Semaphore semaphore;public BoundedExecutor(Executor exec, Semaphore semaphore) {this.exec = exec;this.semaphore = semaphore;}public void submitTask(final Runnable command) throws InterruptedException {semaphore.acquire();try {exec.execute(new Runnable() {@Overridepublic void run() {try {command.run();} finally {semaphore.release();  }}});} catch (Exception ex) {semaphore.release();  }}
}

8.3.4 线程工厂

每当线程池需要创建一个线程时,都是通过线程工厂方法(请参见程序清单8-5)来完成的。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。在中只定义了一个方法,每当线程池需要创建一个新线程时都会调用这个方法/

然而,在许多情况下都需要使用定制的线程工厂方法。例如,你希望为线程池中的线程指定一个dler,或者实例化一个定制的类用于执行调试信息的记录。你还可能希望修改线程的优先级(这通常并不是一个好主意。请参见10.3.1节)或者守护状态(同样,这也不是一个好主意。请参见7.4.2节)。或许你只是希望给线程取一个更有意义的名称,用来解释线程的转储信息和错误日志。

程序清单8-接口

public interface ThreadFactory {Thread newThread(Runnable r);
}

在程序清单8-6的中给出了一个自定义的线程工厂。它创建了一个新的实例,并将一个特定于线程池的名字传递给的构造函数,从而可以在线程转储和错误日志信息中区分来自不同线程池的线程。在应用程序的其他地方也可以使用,以便所有线程都能使用它的调试功能。

程序清单8-6自定义的线程工厂

public class MyThreadFactory implements ThreadFactory {private final String poolName;public MyThreadFactory(String poolName) {this.poolName = poolName;}public Thread newThread(Runnable runable) {return new MyThreadFactory(runnable, poolName);}
}

在中还可以定制其他行为,如程序清单8-7所示,包括:为线程指定名字,设置自定义dler向中写入信息,维护一些统计信息(包括有多少个线程被创建和销毁),以及在线程被创建或者终止时把调试消息写入日志。

程序清单8-7 定制基类

public class MyAppThread extends Thread {private static final String DEFAULT_NAME = "myAppThread";private static volatile boolean debugLifecycle = false;private static final AtomicInteger created = new AtomicInteger();private static final AtomicInteger alive = new AtomicInteger();private static final Logger log = Logger.getAnonymousLogger();public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); }public MyAppThread(Runnable runnable, String name) {super(runnable, name+"-"+created.incrementAndGet());setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {@Overridepublic void uncaughtException(Thread t, Throwable e) {log.log(Level.SEVERE,"UNCAUGHT in thread"+t.getName(), e);  }});}@Overridepublic void run() {// 复制debug标志以确保一致的值boolean debug = debugLifecycle;if (debug) { log.log(Level.FINE, "Created " + getName()); }try {alive.incrementAndGet();super.run();} finally {alive.decrementAndGet();if (debug) { log.log(Level.FINE, "Exiting " + getName()); }}}public static int getThreadCreated() { return created.get(); }public static int getThreadsAlive() { return alive.get(); }public static boolean getDebug() { return debugLifecycle; }public static void setDebug(boolean b) { debugLifecycle = b; }  
}

如果在应用程序中需要利用安全策略来控制对某些特殊代码库的访问权限,那么可以通过中的ory工厂来定制自己的线程工厂。通过这种方式创建出来的线程,将与创建ory的线程拥有相同的访问权限、和。如果不使用ory,线程池创建的线程将从在需要新线程时调用或的客户程序中继承访问权限,从而导致令人困惑的安全性异常。

8.3.5 在调用构造函数后再定制

在调用完的构造函数后,仍然可以通过设置函数()来修改大多数传递给它的构造函数的参数(例如线程池的基本大小、最大大小、存活时间、线程工厂以及拒绝执行处理器( ))。如果是通过中的某个(tor除外)工厂方法创建的,那么可以将结果的类型转换为以访问设置器,如程序清单8-8所示。

程序清单8-8对通过标准工厂方法创建的进行修改

ExecutorService exec = Executors.newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor) {((ThreadPoolExecutor) exec).setCorePoolSize(10);
} else {throw new AssertionError("Oops, bad assumption");
}

在中包含一个工厂方法,该方法对一个现有的进行包装,使其只暴露出的方法,因此不能对它进行配置。tor返回按这种方式封装的,而不是最初的。虽然单线程的实际上被实现为一个只包含唯一线程的线程池,但它同样确保了不会并发地执行任务。如果在代码中增加单线程的线程池大小,那么将破坏它的执行语义。

你可以在自己的中使用这项技术以防止执行策略被修改。如果将暴露给不信任的代码,又不希望对其进行修改,就可以通过来包装它。

8.4 扩展

是可扩展的,它提供了几个可以在子类化中改写的方法:、和,这些方法可以用于扩展的行为。

在执行任务的线程中将调用和等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,都会被调用。(如果任务在完成后带有一个Error,那么就不会调用。)如果抛出一个,那么任务将不被执行,并且也不会被调用。

在线程池完成关闭操作时调用,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。可以用来释放在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集统计信息等操作。

示例 给线程池添加统计信息

在程序清单8-9的中给出了一个自定义的线程池,它通过、和等方法来添加日志记录和统计信息收集。为了测量任务的运行时间,必须记录开始时间并把它保存到一个可以访问的地方。因为这些方法将在执行任务的线程中调用,因此可以把值保存到一个变量中,然后由来读取。在中使用了两个变量,分别用于记录已处理的任务数和总的处理时间,并通过来输出包含平均任务时间的日志消息。

程序清单8-9增加了日志和计时等功能的线程池

public class TimingThreadPool extends ThreadPoolExecutor {private final ThreadLocal<Long> startTime = new ThreadLocal<>();private final Logger log = Logger.getLogger("TimingThreadPool");private final AtomicLong numTasks = new AtomicLong();private final AtomicLong totalTime = new AtomicLong();protected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t ,r);log.fine(String.format("Thread %s: start %s", t, r));startTime.set(System.nanoTime());}protected void afterExecute(Runnable r, Throwable t) {try {long endTime = System.nanoTime();long taskTime = endTime - startTime.get();numTasks.incrementAndGet();totalTime.addAndGet(taskTime);log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));} finally {super.afterExecute(r, t);  }  }protected void terminated() {try {log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get()));  } finally {super.terminated();  }  }
}

8.5 递归算法的并行化

我们对6.3节的页面绘制程序进行了一系列的改进以便不断发掘可利用的并行性。第一次是使程序完全串行执行,第二次虽然使用了两个线程,但仍然是串行地下载所有图像:在最后一次实现中将每个图像的下载操作视为一个独立任务,从而实现了更高的并行性。如果在循环体中包含了一些密集计算,或者需要执行可能阻塞的I/O操作,那么只要每次迭代是独立的,都可以对其进行并行化。

如果循环中的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成再继续执行,那么就可以使用将串行循环转化为并行循环,在程序清单8-10的和中给出了这种方法。

程序清单8-10将串行执行转换为并行执行

void processSequentially(List<Element> elments) {for (Element e : elements) process(e);
}
void processInParallel(Executor exec, List<Elemet> elements) {for (final Element e: elements) {exec.execute(new Runnable() {@Override public void run() { process(e); }});  }
}

调用比调用能更快地返回,因为会在所有下载任务都进入了的队列后就立即返回,而不会等待这些任务全部完成。如果需要提交一个任务集并等待它们完成,那么可以使用.,并且在所有任务都执行完成后调用来获取结果,如第6章的所示。

当串行循环中的各个迭代操作之间彼此独立,并且每个迭代操作执行的工作量比管理一个新任务时带来的开销更多,那么这个串行循环就适合并行化。

在一些递归设计中同样可以采用循环并行化的方法。在递归算法中通常都会存在串行循环,而且这些循环可以按照程序清单8-10的方式进行并行化。一种简单的情况是:在每个迭代操作中都不需要来自于后续递归迭代的结果。例如,程序清单8-11的用深度优先算法遍历一棵树,在每个节点上执行计算并将结果放入一个集合。修改后的同样执行深度优先遍历,但它并不是在访问节点时进行计算,而是为每个节点提交一个任务来完成计算。

程序清单8-11将串行递归转换为并行递归

public<T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {for (Node<T> n : nodes) {results.add(n.compute());  sequentialRecursive(n.getChildren(), results);}
}
public<T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {for (final Node<T> n : nodes) {exec.execute(new Runnable() {@Overridepublic void run() {results.add(n.compute());  }});  parallelRecursive(exec, n.getChildren(), results);}
}

当返回时,树中的各个节点都已经访问过了(但是遍历过程仍然是串行的,只有调用才是并行执行的),并且每个节点的计算任务也已经放入的工作队列。的调用者可以通过以下方式等待所有的结果:创建一个特定于遍历过程的,并使用和等方法,如程序清单8-12所示。

程序清单8-12等待通过并行方式计算的结果

 public<T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {ExecutorService exec = Executors.newCachedThreadPool();Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();parallelRecursive(exec, nodes, resultQueue);exec.shutdown();exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);return resultQueue;}

示例 谜题框架 第9章 图形用户界面应用程序

9.1 为什么GUI是单线程的 9.1.1 串行事件处理 9.1.2 Swing中的线程封闭机制 9.2 短时间的GUI任务 9.3 长时间的GUI任务 9.3.1 取消 9.3.2 进度标识和完成标识 9.3.3 9.4 共享数据模型 9.4.1 线程安全的数据模型 9.4.2 分解数据模型 9.5 其他形式的单线程子系统

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了