本文介绍了一下Java中关于线程调度的线程池的相关内容。本来以
线程池中拥有有限数目的线程,但是其中每一个线程都可以依次运行多个对象。为什么要有线程池?当要处理的单个任务处理的时间很短而请求的数目却是巨大的时。为每个请求创建一个新线程的开销很大,为每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用户请求的时间和资源更多。
一、线程池的简单模拟实现
实现代码:
package com.wly.javathread.chap7;
import java.util.LinkedList;
/**
* 线程池的一种任务队列式的实现,实现的关键在于对"任务队列"进行锁定
* @author wly
*
*/
public class WorkQueue {
private final int nThreads;
private final PoolWorker[] threads;
private final LinkedList queue;
public static void main(String[] args) {
WorkQueue wq = new WorkQueue(3);
Runnable r1 = new Runnable() {
@Override
public void run() {
int i = 0;
while(i < 5) {
System.out.println("r1:" + i + "|" + "Tid:" + Thread.currentThread().getId());
i ++;
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
int i = 0;
while(i < 5) {
System.out.println("r2:" + i + "|" + "Tid:" + Thread.currentThread().getId());
i ++;
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Runnable r3 = new Runnable() {
@Override
public void run() {
int i = 0;
while(i < 5) {
System.out.println("r3:" + i + "|" + "Tid:" + Thread.currentThread().getId());
i ++;
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Runnable r4 = new Runnable() {
@Override
public void run() {
int i = 0;
while(i < 5) {
System.out.println("r4:" + i + "|" + "Tid:" + Thread.currentThread().getId());
i ++;
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Runnable r5 = new Runnable() {
@Override
public void run() {
int i = 0;
while(i < 5) {
System.out.println("r5:" + i + "|" + "Tid:" + Thread.currentThread().getId());
i ++;
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
wq.execute(r1);
wq.execute(r2);
wq.execute(r3);
wq.execute(r4);
wq.execute(r5);
}
public WorkQueue(int nThreads) {
this.nThreads = nThreads;
queue = new LinkedList();
threads = new PoolWorker[nThreads];
for (int i = 0; i < nThreads; i++) {
threads[i] = new PoolWorker();
threads[i].start();
}
}
public void execute(Runnable r) {
synchronized (queue) { //对任务队列进行锁定
queue.addLast(r);
queue.notify(); //唤醒锁对象
}
}
private class PoolWorker extends Thread {
public void run() {
Runnable r;
while (true) {
synchronized (queue) { //对任务队列进行锁定
while (queue.isEmpty()) { //当任务队列为空时,使其进入"等待"状态
try {
queue.wait();
} catch (InterruptedException ignored) {
}
}
r = (Runnable)queue.removeFirst(); //开始队列首位的Runnable,并将其从任务队列中移除
}
// If we don't catch RuntimeException,
// the pool could leak threads
try {
r.run();
} catch (RuntimeException e) {
// You might want to log something here
}
}
}
}
}
运行结果:
r1:0|Tid:8 r3:0|Tid:9 r2:0|Tid:10 r3:1|Tid:9 r1:1|Tid:8 r2:1|Tid:10 r1:2|Tid:8 r3:2|Tid:9 r2:2|Tid:10 r1:3|Tid:8 r3:3|Tid:9 r2:3|Tid:10 r2:4|Tid:10 r1:4|Tid:8 r3:4|Tid:9 r4:0|Tid:9 r5:0|Tid:8 r5:1|Tid:8 r4:1|Tid:9 r5:2|Tid:8 r4:2|Tid:9 r5:3|Tid:8 r4:3|Tid:9 r4:4|Tid:9 r5:4|Tid:8
从运行结果可以看,基本实现了线程池的任务调度的功能。不过这里线程对象没有实现自动销毁功能,是常驻内存的。大致流程:有几个固定的线程对象从任务(Runnable)队列中拿到Runnable对象,然后运行,运行完成后,会再尝试去拿新的任务,如此循环直到没有任务可做为止。
代码来源:http://blog.csdn.net/preterhuman_peak/article/details/7561635
二、Java自带的线程池的使用
稍微介绍一些Java自带的线程池中常用的一些知识,当然笔者也是一边学习一边写博客的,所以也没有很深入的研究。就权当学习笔记了。
Java中线程池相关的类都在并发包concurrent中,本文讨论的线程池主要涉及的类有:RejectedExecutionHandler、ThreadFactory、ThreadPoolExecutor以及ArrayBlockingQueue。其中的ThreadPoolExecutor就是线程池的主角,负责处理任务以及任务的调度;RejectedExecutionHandler是一个负责处理被线程池拒绝的任务对象(Runnable);ThreadFactory是一个线程工厂,可以使用Executors.defaultThreadFactory()来便捷的得到默认线程工厂;ArrayBlockingQueue是用来存放任务对象的数据容器,与其相似的还有LinkedBlockingQueue、SynchronousQueue、PriorityBlockQueue,本文使用的ArrayBlockingQueue,从名字中的Array就可以看出其是"有界"的。
先上代码,看运行结果,再来具体的讨论一下吧!
STEP 1.新建RejectedExecutionHandler的实现类RejectedExecutionHandlerImpl:
package com.wly.javathread.threadpool;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
//Method that may be invoked by a ThreadPoolExecutor when execute cannot accept a task.
//This may occur when no more threads or queue slots are available because their bounds would be exceeded,
//or upon shutdown of the Executor. In the absence of other alternatives, the method may throw an unchecked
//RejectedExecutionException, which will be propagated to the caller of execute.
//当一个ThreadPoolExecutor拒绝接受一个任务(task)时可能会调用该方法。当可执行线程数或者任务队列将要超出边界时会触发本方法或者在关闭
//Executor时可能会触发本方法。在没有意外的情况下,这个方法会抛出一个RejectedExecutionException给这个任务的调用者。
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is rejected");
}
}
STEP 2.新建任务类WorkerThread
package com.wly.javathread.threadpool;
public class WorkerThread implements Runnable {
private String command;
public WorkerThread(String s){
this.command=s;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Command = " + command);
processCommand();
System.out.println(Thread.currentThread().getName() + " End.");
}
private void processCommand() {