引言
最近在写公共线程池工具类时,发现 new ThreadPoolExecutor时候 总得给出一个线程池的拒绝策略。 故此针对任务需要:给出一下拒绝策略。 - getSingleThreadPool
和getSingleCall
方法使用了DiscardOldestPolicy
策略,当任务被拒绝时,它会丢弃队列中等待时间最长的任务,然后尝试提交当前任务。
- getAssign
方法使用了DiscardPolicy
策略,当任务被拒绝时,它会默默地丢弃被拒绝的任务,不做任何处理。
java
private static final ConcurrentHashMap<String, ThreadPoolExecutor> ASSIGN = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, ThreadPoolExecutor> POOL_MAP = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, ThreadPoolExecutor> CALL = new ConcurrentHashMap<>();
public static ThreadPoolExecutor getSingleThreadPool(String s) {
synchronized (POOL_MAP) {
ThreadPoolExecutor threadPoolExecutor = POOL_MAP.get(s);
if (threadPoolExecutor == null) {
threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1),
new CustomizableThreadFactory("ARRIVE-" + s),
new ThreadPoolExecutor.DiscardOldestPolicy());
POOL_MAP.put(s, threadPoolExecutor);
}
return threadPoolExecutor;
}
}
public static ThreadPoolExecutor getSingleCall(String s) {
synchronized (CALL) {
ThreadPoolExecutor threadPoolExecutor = CALL.get(s);
if (threadPoolExecutor == null) {
threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1),
new CustomizableThreadFactory("AUTO-" + s),
new ThreadPoolExecutor.DiscardOldestPolicy());
CALL.put(s, threadPoolExecutor);
}
return threadPoolExecutor;
}
}
public static ThreadPoolExecutor getAssign(String s) {
synchronized (ASSIGN) {
ThreadPoolExecutor threadPoolExecutor = ASSIGN.get(s);
if (threadPoolExecutor == null) {
threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1),
new CustomizableThreadFactory("ASSIGN-" + s),
new ThreadPoolExecutor.DiscardPolicy());
ASSIGN.put(s, threadPoolExecutor);
}
return threadPoolExecutor;
}
}
文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/16433.html
正文
针对以上使用,故此想学习了解一下所有拒绝策略:文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/16433.html
在高并发的Java应用开发中,线程池是提升性能和资源利用率的重要工具。它能够有效管理线程生命周期,控制并发数,以及减少线程创建和销毁的开销。然而,当系统负载急剧增加,超出线程池处理能力时,如何优雅地处理多余的任务就显得尤为关键。这便引出了一个高级话题——线程池的拒绝策略。本文将深入探讨Java线程池中拒绝策略的内涵、种类及其在实际开发中的应用技巧,帮助开发者在面对极端情况时,能够巧妙地设计和选择合适的策略,确保应用的鲁棒性和稳定性文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/16433.html
Java中的线程池ThreadPoolExecutor
提供了四种内置的拒绝策略,当任务被提交到线程池,但线程池无法接受任务时,这些策略会被触发。以下是这四种拒绝策略的详细介绍和分析:文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/16433.html
1. AbortPolicy(中止策略)
描述:这是默认的拒绝策略。当任务被拒绝时,AbortPolicy
策略会抛出一个RejectedExecutionException
异常。文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/16433.html
分析:这种策略直接给予反馈,明确告知任务无法被执行。它适用于那些希望立即知道任务未能成功提交的场景。然而,这种策略可能会导致系统不稳定,因为异常需要被上层逻辑适当处理。文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/16433.html
适用场景:当任务提交者需要立即知道任务被拒绝,并且愿意处理这种异常情况时。通常用于快速失败的场景,比如启动时的初始化任务。文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/16433.html
文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/16433.html
2. CallerRunsPolicy(调用者运行策略)
描述:这种策略不会抛出异常。相反,当任务被拒绝时,任务会被提交给正在执行execute
方法的线程来运行。文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/16433.html
分析:这种策略提供了一种简单的反馈机制,可以减缓新任务的提交速度。它允许调用者线程直接运行任务,从而提供了一种减压机制。但是,这可能会影响到调用者线程原本的执行流程,尤其是当调用者线程本身是UI线程或者其他关键线程时。 - 适用场景:适合于降低新任务提交速度的场合。例如,当任务是由某些限制并发的控制器提交时,可以让控制器线程自己执行任务,从而减缓任务提交速度。文章源自灵鲨社区-https://www.0s52.com/bcjc/javajc/16433.html
3. DiscardPolicy(丢弃策略)
描述:这种策略默默地丢弃无法处理的任务,不做任何处理也不抛出异常。
分析:这是一种最为简单的策略,因为它不提供任何反馈。该策略可能会导致任务的丢失,因为提交的任务没有得到任何处理。在某些场景下,如果任务的丢失不会造成严重后果,可以使用此策略。 - 适用场景:当任务可以被丢弃且不影响系统整体功能时,例如在某些非关键的日志记录或统计信息更新上。
4. DiscardOldestPolicy(丢弃最旧策略)
描述:这种策略将丢弃最早的未处理任务(队列中等待最久的任务),然后尝试重新提交新任务。
分析:这种策略试图为新提交的任务腾出空间,但在高负载的情况下可能会丢弃大量的任务。它适用于那些任务时效性较高,可以接受放弃一些旧任务的场景。 - 适用场景:适用于需要处理更多新鲜数据,而不是积压数据的场景。这种策略通常用于实时性要求较高的任务执行。
在选择合适的拒绝策略时,需要根据实际应用的需求和特点来决定。例如,如果任务对时效性要求不高,可以选择CallerRunsPolicy
策略;如果系统对稳定性要求极高,不希望因为任务拒绝而出现异常,可以选择DiscardOldestPolicy
或DiscardPolicy
策略。无论选择哪种策略,都应该在系统设计阶段就考虑好任务拒绝的可能性,并确保系统能够以适当的方式处理这种情况。
以上是ThreadPoolExecutor 自带四种拒绝策略,如果觉得不合适可以自己实现一个:
5.自定义拒绝策略
需要创建一个实现了RejectedExecutionHandler
接口的类。这个接口定义了一个方法rejectedExecution(Runnable r, ThreadPoolExecutor executor)
, 需要在这个方法中定义当任务被拒绝时的行为。 下面是一个自定义拒绝策略的示例,该策略会记录被拒绝任务的信息,并尝试在一定时间后重新提交任务到线程池:
java
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录被拒绝的任务信息
System.err.println("任务被拒绝: " + r.toString());
// 可以在这里实现更复杂的逻辑,例如记录日志、发送通知等
// 如果线程池未关闭,尝试在一定时间后重新提交任务
if (!executor.isShutdown()) {
try {
// 等待一段时间
TimeUnit.SECONDS.sleep(1);
// 重新提交任务
executor.execute(r);
} catch (InterruptedException e) {
// 线程被中断的处理
Thread.currentThread().interrupt();
}
}
}
}
要使用这个自定义的拒绝策略,需要在创建ThreadPoolExecutor
时将其作为参数传入:
java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, // 核心线程数
1, // 最大线程数
0L, // 保持活动时间
TimeUnit.MILLISECONDS, // 时间单位
new ArrayBlockingQueue<>(1), // 工作队列
new MyRejectedExecutionHandler() // 自定义拒绝策略
);
// 提交任务
for (int i = 0; i < 10; i++) {
final int index = i;
threadPoolExecutor.execute(() -> {
try {
// 假设每个任务执行需要一段时间
TimeUnit.SECONDS.sleep(2);
System.out.println("任务执行: " + index);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 关闭线程池
threadPoolExecutor.shutdown();
}
}
在这个例子中,当任务被拒绝时,自定义的拒绝策略会首先打印出被拒绝任务的信息,然后等待1秒钟,尝试重新将任务提交到线程池中。当然,可以根据实际需求定制更加复杂的拒绝策略,例如将任务存入持久化队列、限流、降级处理等。
总结:
通过本文的深入分析,了解了线程池拒绝策略的重要性和实践意义。从AbortPolicy的快速失败,到CallerRunsPolicy的调用者运行,再到DiscardPolicy与DiscardOldestPolicy的不同丢弃方式,每种内置策略都有其适用场景和潜在影响。此外,还探讨了自定义拒绝策略的设计思路,包括日志记录、任务持久化、等待队列、优先级调整和流量控制等方案。这些自定义策略为处理任务溢出提供了更多的灵活性和可控性。线程池和拒绝策略的合理应用,不仅能够提升系统的并发处理能力,还能在高负载情况下保障系统的稳定运行,是高质量Java应用不可或缺的一部分。希望读者能够根据本文的内容,结合自己的业务需求,设计出既合理又高效的线程池拒绝策略,为构建高可用的Java应用奠定坚实的基础。
评论