为什么需要线程池?因为线程是一个计算机的重要资源,创建,启动,销毁线程都是比较耗费计算机资源的,而且线程是有限的,当有时候请求过多的时候,不可能根据请求的多少而创建多少个线程,所以就需要线程池对线程进行管理和重复利用。
通过上面的介绍线程的特点可以推断出,一个完整的线程池应该具有以下功能:
通过上面的定义,就可以自己写一个线程池,首先是接口类:
线程池接口,定义了线程池的一些方法,其中最主要的是execute,shutdown,后面主要实现这两个方法。
public interface ThreadPool {
//提交任务到线程池
void execute(Runnable runnable);
//关闭
void shutdown();
//获取线程池初始化时的线程大小
int getInitSize();
//获取线程池最大线程输
int getMaxSize();
//获取线程池核心线程数量
int getCoreSize();
//获取活跃线程数量
int getActiveCount();
//获取线程池缓存队列大小
int getQueueSize();
//查看线程是否被销毁
boolean isShutdown();
}
//缓存提交到线程池的队列任务
public interface RunnableQueue {
//新线程进来时,提交任务到缓存队列
void offer(Runnable runnable);
//取出线程
Runnable take();
//获取队列中线程的数量
int size();
}
没什么特别,只是继承了Runtime异常,换了一个名字而已
//自定义异常
public class RunnableDenyException extends RuntimeException {
public RunnableDenyException(String msg){
super(msg);
}
}
拒绝策略,我定义了三种策略,在代码中有注释可以体现
import Thread.ThreadPool.RunnableDenyException;
@FunctionalInterface
//这个类定义了当缓存队列达到上限的时候,将通过什么方式来通知提交者,实现了默认的三种方法
public interface DenyPolicy {
void reject(Runnable runnable, ThreadPool threadPool);
//直接丢弃线程,什么都不做,不通知
class DiscardDenyPolicy implements DenyPolicy {
@Override
public void reject(Runnable runnable, ThreadPool threadPool) {
}
}
//抛出异常通知
class AbortDenyPolicy implements DenyPolicy {
@Override
public void reject(Runnable runnable, ThreadPool threadPool) {
throw new RunnableDenyException("这个线程:" + runnable + " 将会被丢弃");
}
}
//使线程在提交者所在的线程中运行
class RunnerDenyPolicy implements DenyPolicy {
@Override
public void reject(Runnable runnable, ThreadPool threadPool) {
if (!threadPool.isShutdown()) {
runnable.run();
}
}
}
}
创建线程的工厂,用于创建线程
@FunctionalInterface
//创建线程的工厂
public interface ThreadFactory {
Thread creatThread(Runnable runnable);
}
使用了linkedList作为队列,进队的offer方法要判断队列是否满,没有满则加入队列并唤醒,出队只需要判断队列是否为空,如果为空就阻塞等待
这里也可以使用自己写的Queue方法,同样具有进队出队功能,最后会把代码放上
import java.util.LinkedList;
public class LinkedRunnableQueue implements RunnableQueue {
//任务队列的最大容量
private final int limit;
//容量最大时,需要使用的拒绝策略
private final DenyPolicy denyPolicy;
//存放任务的队列
private final LinkedList<Runnable> runnableLinkedList;
private final ThreadPool threadPool;
public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
this.limit = limit;
this.denyPolicy = denyPolicy;
this.threadPool = threadPool;
runnableLinkedList = new LinkedList<>();
}
@Override
public void offer(Runnable runnable) {
synchronized (runnableLinkedList) {
//如果缓存数量超过最大值,则使用拒绝策略
if (runnableLinkedList.size() >= limit) {
denyPolicy.reject(runnable, threadPool);
} else {
//成功加入list的末尾,并唤醒阻塞中的线程
runnableLinkedList.addLast(runnable);
runnableLinkedList.notifyAll();
}
}
}
@Override
public Runnable take() {
synchronized (runnableLinkedList) {
//如果缓存队列为空,则挂起,等待新的任务进来唤醒
while (runnableLinkedList.isEmpty()) {
try {
runnableLinkedList.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return runnableLinkedList.removeFirst();
}
@Override
public int size() {
synchronized (runnableLinkedList){
//返回list中的个数
return runnableLinkedList.size();
}
}
}
//实现Runnable,用于线程池内部,该类会用到RunnableQueue,会不断的从队列中拿出线程并运行
public class InternalTask implements Runnable {
private final RunnableQueue runnableQueue;
private volatile boolean running = true;
public InternalTask(RunnableQueue runnableQueue) {
this.runnableQueue = runnableQueue;
}
@Override
public void run() {
//如果当前线程在运行中切没有被中断,则不断从缓存队列中拿出线程运行
while (running && !Thread.currentThread().isInterrupted()){
try {
Runnable task = runnableQueue.take();
task.run();
} catch (Exception e) {
running = false;
break;
}
}
}
//停止当前任务,会在shutdown中使用
public void stop(){
this.running = false;
}
}
package Thread.ThreadPool;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public class BasicThreadPool extends Thread implements ThreadPool {
//初始化线程池的数量
private final int initSize;
//线程池最大线程数
private final int maxSize;
//线程池核心线程数
private final int coreSize;
//当前活跃线程的数量
private int activeCount;
//创建线程的工厂
private final ThreadFactory threadFactory;
//任务队列
private final RunnableQueue runnableQueue;
//线程是否被摧毁
private volatile boolean isShutdown = false;
//工作队列
private final Queue<ThreadTask> internalTasks = new ArrayDeque<>();
//拒绝策略
private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
//看下面,自定义线程工厂
private final static ThreadFactory DEFAULT_THREAD_FACTORY =
new DefaultThreadFactory();
private final long keepAliveTime;
private final TimeUnit timeUnit;
//构造默认线程池时需要传入的参数:初始线程池的数量,最大线程的数量,核心线程数量,任务队列的最大数
public BasicThreadPool(int initSize, int maxSize, int coreSize,
int queueSize) {
this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY,
queueSize, DEFAULT_DENY_POLICY, 2,
TimeUnit.SECONDS);
}
public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize,
DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {
this.initSize = initSize;
this.maxSize = maxSize;
this.coreSize = coreSize;
this.threadFactory = threadFactory;
this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
this.init();
}
//初始化线程池并创建initSize个线程
private void init() {
//继承了Thread类,初始化时先启动自己
start();
IntStream.range(0, initSize).forEach(i -> newThread());
}
//创建新的任务线程并启动
private void newThread() {
InternalTask internalTask = new InternalTask(runnableQueue);
Thread thread = this.threadFactory.creatThread(internalTask);
ThreadTask threadTask = new ThreadTask(thread, internalTask);
internalTasks.offer(threadTask);
this.activeCount++;
thread.start();
}
private void removeThread() {
ThreadTask threadTask = internalTasks.remove();
threadTask.internalTask.stop();
this.activeCount--;
}
@Override
public void execute(Runnable runnable) {
if (this.isShutdown) {
throw new IllegalStateException("这个线程池已经被销毁了");
}
this.runnableQueue.offer(runnable);
}
@Override
public void run() {
//自动维护线程池
while (!isShutdown && !isInterrupted()) {
try {
timeUnit.sleep(keepAliveTime);
} catch (InterruptedException e) {
e.printStackTrace();
isShutdown = true;
break;
}
synchronized (this) {
if (isShutdown) {
break;
}
//当任务队列大于0,活跃线程小于核心线程的时候,扩容线程
if (runnableQueue.size() > 0 && activeCount < coreSize) {
IntStream.range(initSize, coreSize).forEach(i -> newThread());
continue;
}
if (runnableQueue.size() > 0 && activeCount < maxSize) {
IntStream.range(coreSize, maxSize).forEach(i -> newThread());
}
if (runnableQueue.size() == 0 && activeCount > coreSize) {
IntStream.range(coreSize, activeCount).forEach(i -> removeThread());
}
}
}
}
@Override
public void shutdown() {
}
//这一段方法不是特别重要,就有读者自己写
@Override
public int getInitSize() {
return 0;
}
@Override
public int getMaxSize() {
return 0;
}
@Override
public int getCoreSize() {
return 0;
}
@Override
public int getActiveCount() {
return 0;
}
@Override
public int getQueueSize() {
return 0;
}
@Override
public boolean isShutdown() {
return this.isShutdown;
}
//把线程和internalTask一个组合
private static class ThreadTask {
public ThreadTask(Thread thread, InternalTask internalTask) {
this.thread = thread;
this.internalTask = internalTask;
}
Thread thread;
InternalTask internalTask;
}
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
private static final ThreadGroup group = new ThreadGroup("我的线程-" +
GROUP_COUNTER.getAndDecrement());
private static final AtomicInteger COUNTER = new AtomicInteger(0);
@Override
public Thread creatThread(Runnable runnable) {
return new Thread(group, runnable, "线程池-" + COUNTER.getAndDecrement());
}
}
}
好了,基础线程池已经完成测试代码
public class Main {
public static void main(String[] args) {
final ThreadPool threadPool =new BasicThreadPool(2,6,4,100);
for (int i = 0;i<=20;i++){
threadPool.execute(()->{
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+"开始了");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
最后给出一个Queue的实现类,有不对的请指正
import java.util.Iterator;
import java.util.NoSuchElementException;
public class Queue<Item> implements Iterable<Item> {
private int N;
private Node first;
private Node last;
@Override
public Iterator<Item> iterator() {
return new ListIterator();
}
public class Node {
private Item item;
private Node next;
}
public Queue() {
first = null;
last = null;
}
public boolean isEmpty() {
return first == null;
}
public int Size() {
return N;
}
public Item peek() {
return first.item;
}
public void enqueue(Item item) {
Node x = new Node();
x.item = item;
if (isEmpty()) {
first = x;
last = x;
} else {
last.next = x;
last = x;
}
N++;
}
public Item dequeue() {
if (isEmpty()) {
throw new RuntimeException("队列为空,不能出");
}
Item item = first.item;
first = first.next;
if (isEmpty()) last = null;
return item;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Item item : this) {
sb.append(item + " ");
}
return sb.toString();
}
private class ListIterator implements Iterator<Item> {
private Node current = first;
public boolean hasNext() {
return current != null;
}
public void remove() {
throw new UnsupportedOperationException();
}
public Item next() {
if (!hasNext()) throw new NoSuchElementException();
Item item = current.item;
current = current.next;
return item;
}
}
}
如果您发现该资源为电子书等存在侵权的资源或对该资源描述不正确等,可点击“私信”按钮向作者进行反馈;如作者无回复可进行平台仲裁,我们会在第一时间进行处理!
加入交流群
请使用微信扫一扫!