Java线程池原理——自己手写线程池


prtyaa
prtyaa 2023-12-30 21:24:35 53384 赞同 0 反对 0
分类: 资源
所谓线程池,就是一个用来放线程的池子,里面存放着已经创建好的线程,当有任务提交的时候,池子里面的某个线程会执行这个任务,当任务结束后,线程又回到了池子里面等待下一个任务。当任务太多的时候,池子就要自动加水,创建更多的线程,但池子也是有容量的;当任务太少的时候,池子就要放掉一些线程,以免资源浪费。

为什么需要线程池?因为线程是一个计算机的重要资源,创建,启动,销毁线程都是比较耗费计算机资源的,而且线程是有限的,当有时候请求过多的时候,不可能根据请求的多少而创建多少个线程,所以就需要线程池对线程进行管理和重复利用。

通过上面的介绍线程的特点可以推断出,一个完整的线程池应该具有以下功能:

  • 任务队列:用于缓存已经提交的任务。
  • 线程数量管理:线程池应该自动控制线程池里面线程的数量,可以通过以下三个参数来实现:(1)初始线程数init,线程池能容纳最大线程数max,线程的活跃数量:core,三者的关系是int<=core<=max
  • 任务拒绝策略:当线程数量达到上限且任务队列满的时候,需要有对应的拒绝策略来通知提交者线程被拒绝。
  • 线程工厂:用于自定义线程,比如设置线程名称等。
  • 提交队列:用于存放提交的线程,需要有数量限制。
  • 活跃时间:线程自动维护的时间。

通过上面的定义,就可以自己写一个线程池,首先是接口类:

ThreadPool

线程池接口,定义了线程池的一些方法,其中最主要的是execute,shutdown,后面主要实现这两个方法。
public interface ThreadPool {
    //提交任务到线程池
    void execute(Runnable runnable);
    //关闭
    void shutdown();
    //获取线程池初始化时的线程大小
    int getInitSize();
    //获取线程池最大线程输
    int getMaxSize();
    //获取线程池核心线程数量
    int getCoreSize();
    //获取活跃线程数量
    int getActiveCount();
    //获取线程池缓存队列大小
    int getQueueSize();
    //查看线程是否被销毁
    boolean isShutdown();

}

RunnableQueue

//缓存提交到线程池的队列任务
public interface RunnableQueue {
    //新线程进来时,提交任务到缓存队列
    void offer(Runnable runnable);
    //取出线程
    Runnable take();
    //获取队列中线程的数量
    int size();
}

自定义异常

没什么特别,只是继承了Runtime异常,换了一个名字而已
//自定义异常
public class RunnableDenyException extends RuntimeException {
    public RunnableDenyException(String msg){
        super(msg);
    }
}

DenyPolicy

拒绝策略,我定义了三种策略,在代码中有注释可以体现
import Thread.ThreadPool.RunnableDenyException;

@FunctionalInterface
//这个类定义了当缓存队列达到上限的时候,将通过什么方式来通知提交者,实现了默认的三种方法
public interface DenyPolicy {
    void reject(Runnable runnable, ThreadPool threadPool);


    //直接丢弃线程,什么都不做,不通知
    class DiscardDenyPolicy implements DenyPolicy {

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {

        }
    }

    //抛出异常通知
    class AbortDenyPolicy implements DenyPolicy {

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            throw new RunnableDenyException("这个线程:" + runnable + " 将会被丢弃");
        }
    }

    //使线程在提交者所在的线程中运行
    class RunnerDenyPolicy implements DenyPolicy {
        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            if (!threadPool.isShutdown()) {
                runnable.run();
            }
        }
    }
}

ThreadFactory

创建线程的工厂,用于创建线程
@FunctionalInterface
//创建线程的工厂
public interface ThreadFactory {

    Thread creatThread(Runnable runnable);
}

接口定义完成后,就开始写接口的实现类了

缓存队列

使用了linkedList作为队列,进队的offer方法要判断队列是否满,没有满则加入队列并唤醒,出队只需要判断队列是否为空,如果为空就阻塞等待
这里也可以使用自己写的Queue方法,同样具有进队出队功能,最后会把代码放上
import java.util.LinkedList;

public class LinkedRunnableQueue implements RunnableQueue {
    //任务队列的最大容量
    private final int limit;
    //容量最大时,需要使用的拒绝策略
    private final DenyPolicy denyPolicy;
    //存放任务的队列
    private final LinkedList<Runnable> runnableLinkedList;
    private final ThreadPool threadPool;

    public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
        this.limit = limit;
        this.denyPolicy = denyPolicy;
        this.threadPool = threadPool;
        runnableLinkedList = new LinkedList<>();
    }

    @Override
    public void offer(Runnable runnable) {
        synchronized (runnableLinkedList) {
            //如果缓存数量超过最大值,则使用拒绝策略
            if (runnableLinkedList.size() >= limit) {
                denyPolicy.reject(runnable, threadPool);
            } else {
                //成功加入list的末尾,并唤醒阻塞中的线程
                runnableLinkedList.addLast(runnable);
                runnableLinkedList.notifyAll();
            }
        }
    }

    @Override
    public Runnable take() {
        synchronized (runnableLinkedList) {
            //如果缓存队列为空,则挂起,等待新的任务进来唤醒
            while (runnableLinkedList.isEmpty()) {
                try {
                    runnableLinkedList.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return runnableLinkedList.removeFirst();
    }

    @Override
    public int size() {
        synchronized (runnableLinkedList){
            //返回list中的个数
            return runnableLinkedList.size();
        }
    }
}

继承Runnable

//实现Runnable,用于线程池内部,该类会用到RunnableQueue,会不断的从队列中拿出线程并运行
public class InternalTask implements Runnable {

    private final RunnableQueue runnableQueue;
    private volatile boolean running = true;

    public InternalTask(RunnableQueue runnableQueue) {
        this.runnableQueue = runnableQueue;
    }

    @Override
    public void run() {
        //如果当前线程在运行中切没有被中断,则不断从缓存队列中拿出线程运行
        while (running && !Thread.currentThread().isInterrupted()){
            try {
                Runnable task = runnableQueue.take();
                task.run();
            } catch (Exception e) {
                running = false;
                break;
            }
        }
    }
    //停止当前任务,会在shutdown中使用
    public void stop(){
        this.running = false;
    }
}

最后是线程池的具体实现,有注释

package Thread.ThreadPool;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;


public class BasicThreadPool extends Thread implements ThreadPool {
    //初始化线程池的数量
    private final int initSize;
    //线程池最大线程数
    private final int maxSize;
    //线程池核心线程数
    private final int coreSize;
    //当前活跃线程的数量
    private int activeCount;
    //创建线程的工厂
    private final ThreadFactory threadFactory;
    //任务队列
    private final RunnableQueue runnableQueue;
    //线程是否被摧毁
    private volatile boolean isShutdown = false;
    //工作队列
    private final Queue<ThreadTask> internalTasks = new ArrayDeque<>();
    //拒绝策略
    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
    //看下面,自定义线程工厂
    private final static ThreadFactory DEFAULT_THREAD_FACTORY =
            new DefaultThreadFactory();
    private final long keepAliveTime;
    private final TimeUnit timeUnit;


    //构造默认线程池时需要传入的参数:初始线程池的数量,最大线程的数量,核心线程数量,任务队列的最大数
    public BasicThreadPool(int initSize, int maxSize, int coreSize,
                           int queueSize) {
        this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY,
                queueSize, DEFAULT_DENY_POLICY, 2,
                TimeUnit.SECONDS);
    }

    public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize,
                           DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {
        this.initSize = initSize;
        this.maxSize = maxSize;
        this.coreSize = coreSize;
        this.threadFactory = threadFactory;
        this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;
        this.init();
    }

    //初始化线程池并创建initSize个线程
    private void init() {
        //继承了Thread类,初始化时先启动自己
        start();
        IntStream.range(0, initSize).forEach(i -> newThread());
    }

    //创建新的任务线程并启动
    private void newThread() {
        InternalTask internalTask = new InternalTask(runnableQueue);
        Thread thread = this.threadFactory.creatThread(internalTask);
        ThreadTask threadTask = new ThreadTask(thread, internalTask);
        internalTasks.offer(threadTask);
        this.activeCount++;
        thread.start();
    }

    private void removeThread() {
        ThreadTask threadTask = internalTasks.remove();
        threadTask.internalTask.stop();
        this.activeCount--;
    }

    @Override
    public void execute(Runnable runnable) {
        if (this.isShutdown) {
            throw new IllegalStateException("这个线程池已经被销毁了");
        }
        this.runnableQueue.offer(runnable);
    }

    @Override
    public void run() {
       //自动维护线程池
        while (!isShutdown && !isInterrupted()) {
            try {
                timeUnit.sleep(keepAliveTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
                isShutdown = true;
                break;
            }
            synchronized (this) {
                if (isShutdown) {
                    break;
                }
                //当任务队列大于0,活跃线程小于核心线程的时候,扩容线程
                if (runnableQueue.size() > 0 && activeCount < coreSize) {
                    IntStream.range(initSize, coreSize).forEach(i -> newThread());
                    continue;
                }
                if (runnableQueue.size() > 0 && activeCount < maxSize) {
                    IntStream.range(coreSize, maxSize).forEach(i -> newThread());
                }
                if (runnableQueue.size() == 0 && activeCount > coreSize) {
                    IntStream.range(coreSize, activeCount).forEach(i -> removeThread());
                }

            }
        }
    }

    @Override
    public void shutdown() {

    }
    //这一段方法不是特别重要,就有读者自己写
    @Override
    public int getInitSize() {
        return 0;
    }

    @Override
    public int getMaxSize() {
        return 0;
    }

    @Override
    public int getCoreSize() {
        return 0;
    }

    @Override
    public int getActiveCount() {
        return 0;
    }

    @Override
    public int getQueueSize() {
        return 0;
    }

    @Override
    public boolean isShutdown() {
        return this.isShutdown;
    }
    //把线程和internalTask一个组合
    private static class ThreadTask {
        public ThreadTask(Thread thread, InternalTask internalTask) {
            this.thread = thread;
            this.internalTask = internalTask;
        }

        Thread thread;
        InternalTask internalTask;
    }

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);
        private static final ThreadGroup group = new ThreadGroup("我的线程-" +
                GROUP_COUNTER.getAndDecrement());
        private static final AtomicInteger COUNTER = new AtomicInteger(0);

        @Override
        public Thread creatThread(Runnable runnable) {
            return new Thread(group, runnable, "线程池-" + COUNTER.getAndDecrement());
        }
    }
}

好了,基础线程池已经完成测试代码

public class Main {
    public static void main(String[] args) {
        final ThreadPool threadPool =new BasicThreadPool(2,6,4,100);
        for (int i = 0;i<=20;i++){
            threadPool.execute(()->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"开始了");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

最后给出一个Queue的实现类,有不对的请指正

import java.util.Iterator;
import java.util.NoSuchElementException;

public class Queue<Item> implements Iterable<Item> {
    private int N;
    private Node first;
    private Node last;

    @Override
    public Iterator<Item> iterator() {
        return new ListIterator();
    }

    public class Node {
        private Item item;
        private Node next;
    }

    public Queue() {
        first = null;
        last = null;
    }

    public boolean isEmpty() {
        return first == null;
    }

    public int Size() {
        return N;
    }

    public Item peek() {
        return first.item;
    }

    public void enqueue(Item item) {
        Node x = new Node();
        x.item = item;
        if (isEmpty()) {
            first = x;
            last = x;
        } else {
            last.next = x;
            last = x;
        }
        N++;
    }

    public Item dequeue() {
        if (isEmpty()) {
            throw new RuntimeException("队列为空,不能出");
        }
        Item item = first.item;
        first = first.next;
        if (isEmpty()) last = null;
        return item;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        for (Item item : this) {
            sb.append(item + " ");
        }
        return sb.toString();
    }

    private class ListIterator implements Iterator<Item> {
        private Node current = first;

        public boolean hasNext() {
            return current != null;
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }

        public Item next() {
            if (!hasNext()) throw new NoSuchElementException();
            Item item = current.item;
            current = current.next;
            return item;
        }
    }
}

如果您发现该资源为电子书等存在侵权的资源或对该资源描述不正确等,可点击“私信”按钮向作者进行反馈;如作者无回复可进行平台仲裁,我们会在第一时间进行处理!

评价 0 条
prtyaaL2
粉丝 1 资源 1949 + 关注 私信
最近热门资源
银河麒麟桌面操作系统备份用户数据  125
统信桌面专业版【全盘安装UOS系统】介绍  120
银河麒麟桌面操作系统安装佳能打印机驱动方法  112
银河麒麟桌面操作系统 V10-SP1用户密码修改  105
最近下载排行榜
银河麒麟桌面操作系统备份用户数据 0
统信桌面专业版【全盘安装UOS系统】介绍 0
银河麒麟桌面操作系统安装佳能打印机驱动方法 0
银河麒麟桌面操作系统 V10-SP1用户密码修改 0
作者收入月榜
1

prtyaa 收益393.62元

2

zlj141319 收益218元

3

1843880570 收益214.2元

4

IT-feng 收益209.03元

5

风晓 收益208.24元

6

777 收益172.71元

7

Fhawking 收益106.6元

8

信创来了 收益105.84元

9

克里斯蒂亚诺诺 收益91.08元

10

技术-小陈 收益79.5元

请使用微信扫码

加入交流群

请使用微信扫一扫!