Java线程池

一、介绍

线程池,顾名思义,这是管理一堆线程而出现的对象。与数据库的连接池一致,它的出现解决了线程的频繁创建和销毁,从而浪费大量资源的问题。

所以,线程池中有提前创建好的线程,使用时直接分配获取,使用完再由线程池管理是否销毁。

优点

  • 降低资源消耗,也就是不需要重复多次的创建线程
  • 更好的管理线程
    • 比如可以获取当前运行的线程是什么
    • 还在等待执行的任务有什么

二、使用线程池

在JDK5起提供了线程池的对象,ExecutorServiceExecutors

其中,ExecutorService和它的子类ThreadPoolExecutor是线程池的关键

Executors是对应的工具类,里面有些工厂方法可以快速创建线程池

查看ThreadPoolExecutor的构造方法

public class ThreadPoolExecutor extends AbstractExecutorService {
    
	public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {}
}
参数 说明
corePoolSize 核心线程数。就算目前空闲,也不会回收这几个线程
maximumPoolSize 最大线程数。当前线程池可以容纳的最大线程数量
keepAliveTime 线程保持存活时间。如果线程空间时间到达,将会进行销毁(除了核心线程)
unit keepAliveTime一起使用,仅是个时间单位
workQueue 工作等待队列。当线程池所有的线程都繁忙运行时,新添加的执行任务会暂时保留至此队列
threadFactory 创建线程的线程工厂
handler 拒绝策略。当队列满了后,还有执行任务进入时的策略

workQueue参数需要传入一个BlockingQueue,这是个阻塞队列。BlockingQueue内部使用两条队列,允许两个线程同时向队列一个存储,一个取出操作。在保证并发安全的同时,提高了队列的存取效率,不能传入空对象,可设置容量大小,也可以不设置容量大小,那么它的容量就是Integer.MAX_VALUE。常用的几种实现类

说明
ArrayBlockingQueue 规定容量大小的阻塞队列
LinkedBlockingQueue 既可以规定容量大小,也可以不规定的阻塞队列
SynchronizedQueue 一个特殊的队列,生产消费必须交替完成的队列
生产一个元素后,必须要有进行消费后,才能继续往队列内生产元素

handler拒绝策略

当线程池指定的队列容量满了时,将执行哪种拒绝任务的策略

策略类 说明
AbortPolicy 默认,不执行新任务,直接抛出异常,提示线程池已满
DiscardPolicy 不执行新任务,也不抛出异常
DiscardOldestPolicy 它丢弃最老的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。
CallerRunsPolicy 直接在外层调用者的线程中调用新任务

1)小试牛刀

package com.banmoon.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo1 {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        executorService.execute(new MyRunnable());
        executorService.execute(new MyRunnable());
        executorService.execute(new MyRunnable());
        executorService.execute(new MyRunnable());
        executorService.execute(new MyRunnable());
        // lambda表达式
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName());
        });
        // 关闭线程池,如果不关闭,线程池将一直存在,池子内保留着核心线程,等待着调用
        executorService.shutdown();
    }

}

class MyRunnable implements Runnable{

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

image-20211211195656264

2)Executors工具类

关于此的三个相关方法源码,其中还有一些他们的重载,这边就不细细讲了。

这些工具类方法,主要是快速创建ThreadPoolExecutor对象的方法,只是它们的参数各有所不同

public class Executors {
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
}
方法 参数说明 效果
newCachedThreadPool 核心线程数为0
最大线程数已调到Integer.MAX_VALUE
每提交一个线程任务,都将新创建一个新的线程来执行
如果需要执行的任务很多,这有可能会导致CPU100%的问题
newFixedThreadPool 核心线程数和最大线程数一致
但队列长度为Integer.MAX_VALUE
提交的任务将正常交给池子中的线程执行,执行完成也不会销毁,等待执行新的任务
如果执行的任务很多,队列会一直添加任务等待执行,可能会造成内存溢出的问题
newSingleThreadExecutor 核心线程数和最大线程数都为1
但队列长度为Integer.MAX_VALUE
newFixedThreadPool类似,但池子中只有一个线程

根据需要来进行使用合适的线程池,测试下他们的执行方式和快慢

package com.banmoon.pool;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo2 {

    public static void main(String[] args) {
        ExecutorService executorService1 = Executors.newCachedThreadPool();
        ExecutorService executorService2 = Executors.newFixedThreadPool(10);
        ExecutorService executorService3 = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 100; i++) {
            executorService1.execute(new MyDemo2(i));
//            executorService2.execute(new MyDemo2(i));
//            executorService3.execute(new MyDemo2(i));
        }

        executorService1.shutdown();
        executorService2.shutdown();
        executorService3.shutdown();

    }

}

class MyDemo2 implements Runnable {

    private Integer i;

    public MyDemo2(Integer i) {
        this.i = i;
    }

    @Override
    public void run() {
        System.out.println(StrUtil.format("{}:{},时间:{}", Thread.currentThread().getName(), i, DateUtil.now()));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

newCachedThreadPool执行结果可以看到,一共有100个线程被创建出来

image-20211212150430276

newFixedThreadPool执行结果,执行的永远都是那几个固定的线程,这里我们指定了10个线程,所以打印也是10个为一批来进行的。

image-20211212150922546

newSingleThreadExecutor执行结果,从头到尾就只有一个线程在执行

image-20211212154044131

3)线程工厂

虽然有默认的线程工厂,但如果有需要进行处理的话,还是得记录一下

package com.banmoon.pool;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class Demo3 {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), new MyThreadFactory("BANMOON-TEST"));
        for (int i = 0; i < 100; i++) {
            executor.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executor.shutdown();
    }

}


class MyThreadFactory implements ThreadFactory{
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private String poolName;

    MyThreadFactory(String poolName) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        this.poolName = poolName;
    }

    @Override
    public Thread newThread(Runnable r) {
        String threadName = poolName + "-" + threadNumber.getAndIncrement();
        Thread t = new Thread(group, r, threadName, 0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }

}

执行结果

image-20211212163007164

4)拒绝策略

拒绝策略没什么好讲的,平常在使用时,注意下容量的大小,以及使用的策略。自己需要执行的任务数量多少,会不会照成内存溢出等,从这几个方面入手,选择最适合业务的队列容量和拒绝策略。

策略类 说明
AbortPolicy 默认,不执行新任务,直接抛出异常,提示线程池已满
DiscardPolicy 不执行新任务,也不抛出异常
DiscardOldestPolicy 它丢弃最老的未处理请求,然后重试执行,除非执行程序被关闭,在这种情况下任务被丢弃。
CallerRunsPolicy 直接在外层调用者的线程中调用新任务

演示CallerRunsPolicy,会在调用者的线程中,执行超出容量的任务

package com.banmoon.pool;

import java.util.concurrent.*;

public class Demo4 {

    public static void main(String[] args) {
        ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 1; i <= 100; i++) {
            executorService.execute(new MyDemo4(i));
        }
        executorService.shutdown();
    }

}

class MyDemo4 implements Runnable{

    private Integer i;

    public MyDemo4(Integer i) {
        this.i = i;
    }

    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + ":" + i);
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果,上述线程池指定了最大线程数为20,队列容量为20。所以当执行第41个任务时,队列满了,将由调用者的线程来执行这个任务,此处是主线程

image-20211212164523094

三、其他

1)执行任务的优先级

public class ThreadPoolExecutor extends AbstractExecutorService {
    
    public void execute(Runnable command) {
        // 判断是否为空
        if (command == null)
            throw new NullPointerException();
        
        // 判断当前正在运行的线程数是否小于核心线程数
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 添加任务至线程执行,成功添加则结束
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果核心线程都有在运行,将任务放至队列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 如果成功推入队列,将再次检查线程状态,有线程死亡则将当前任务添加至线程执行
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果队列推入任务失败了,那将直接添加至线程执行
        else if (!addWorker(command, false))
            // 如果任务添加至线程失败,则将进行拒绝策略
            reject(command);
    }
    
    /**
     * 会从线程工厂获取线程,并添加执行任务
     * @param firstTask 执行的任务
     * @param core 是否可以添加至核心线程
     * @return true:成功添加至线程执行
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // ...
    }
}

image-20211212183817010

四、最后

线程池这东西干货还是挺多的,还有挺多没有整理完。比如说addWorker方法,线程池的执行调度等

后续有什么新的理解继续补上,未完待续

欢迎来登录我的个人博客

入我相思门,知我相思苦, 长相思兮长相忆,短相思兮无穷极, 早知如此绊人心,何如当初莫相识。

更多文章请关注《万象专栏》

本栏目由《康祺惠购APP》独家赞助