首页 > 基础资料 博客日记

Java开启线程执行异步任务的四种方式

2024-09-16 00:00:08基础资料围观253

文章Java开启线程执行异步任务的四种方式分享给大家,欢迎收藏Java资料网,专注分享技术知识

初始化线程的四种方式

在如今计算机硬件如此发达的时代,我们所使用的计算机CPU都是多核的,例如“12核24线程”。有时候与其纠结单个线程的极致性能优化,倒不如使用多线程来达到事半功倍的效果。那怎么开启多线程来计算呢,初始化线程的方式有四种:

继承Thread(无返回值)

首先创建一个类来继承Thread,然后重写run方法,run方法中用来写自己要在新线程做的事情

public class Thread01 extends Thread{
    @Override
    public void run() {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("运行结果:" + i);
    }
}

启动线程,创建Thread01对象,然后调用start方法

public class ThreadTest {
    public static void main(String[] args) {
        System.out.println("main...start...");
      
        Thread01 thread = new Thread01();
        thread.start(); // 启动线程
        
        System.out.println("main...end...");
    }
}

实现Runnable接口(无返回值)

实现Runnable接口的run方法

public class Runnable01 implements Runnable{
    @Override
    public void run() {
        System.out.println("当前线程:" + Thread.currentThread().getName());
        int i = 10 / 2;
        System.out.println("运行结果:" + i);
    }
}

启动线程,创建Runnable01的对象并传给Thread的构造方法

public static void main(String[] args) {
    System.out.println("main...start...");
    
    Runnable01 runnable01 = new Runnable01();
    new Thread(runnable01).start();

    System.out.println("main...end...");
}

实现Callable接口+FutureTask(有返回值)

public class Callable01 implements Callable<T>,实现Callable接口的时候可以指定返回结果的数据类型,重写call方法的时候返回值也要同步修改,返回整数类型代码如下:

public class Callable01 implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("当前线程:" + Thread.currentThread().getName());
        int i = 10 / 2;
        System.out.println("运行结果:" + i);
        return i;
    }
}

启动方式

// 实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)
FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
new Thread(futureTask).start();

// 等待整个线程执行完成,获取返回结果
Integer integer = futureTask.get();

线程池

创建线程池

方式一:构造方法创建

【源码构造方法】

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

【示例】

new LinkedBlockingDeque<>() 默认是Integer的最大值,但这样可能导致内存不够,建议自己指定数量

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        5,
        200,
        10,
        TimeUnit.SECONDS,
        new LinkedBlockingDeque<>(100000),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
);
线程池七大参数
corePoolSize 核心线程数

一直存在,除非设置了allowCoreThreadTimeOut

maximumPoolSize 最大线程数量

池里面允许的最大线程数量,控制资源使用量

keepAliveTime 存活时间

如果当前的线程数量大于core数量,释放空闲的线程(maximumPoolSize-corePoolSize),只要线程空闲时间大于指定的keepAliveTime。注意:核心线程不会释放

unit 时间单位

和上面的keepAliveTime配合组成完整的时间

workQueue 阻塞队列

如果任务有很多,就会将目前多的任务放在阻塞队列里面。只要有线程空闲,就会去队列里面取出新的任务继续执行。

threadFactory 线程的创建工厂

使用默认的就行

handler

如果队列满了,存储不了更多的任务了,按照我们指定的拒绝策略拒绝执行任务

默认使用:AbortPolicy

线程池工作流程

1、线程池创建的时候,准备好corePoolSize数量的核心线程,准备接受任务 2、核心线程都有任务了,就将再进来的任务放入阻塞队列中。空闲的核心线程就会自己去阻塞队列中获取任务来执行 3、阻塞队列满 了,就直接开新线程执行,最大数量只能开到maximumPoolSize 4、任务执行完成后,如果有很多空闲线程。在指定的时间keepAliveTime以后,释放maximumPoolSize-corePoolSize这些线程,保留corePoolSize个核心线程

问题:线程池参数:core7、max20、queue50。100并发进来怎么分配的?

答:7个会立即得到执行,50个会进入队列,再开13个进行执行。剩下的30个就使用拒绝策略。

方式二:静态方法创建
固定线程数的线程池
// 创建十个线程的线程池
ExecutorService service = Executors.newFixedThreadPool(10);

【源码】

核心线程数=最大线程数

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
线程都可以回收的线程池
ExecutorService executorService = Executors.newCachedThreadPool();

【源码】

核心线程数=0,即线程池的所有线程在空闲的时候都可以回收

最大线程数设置为整数类型最大值

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
单线程线程池

每次从队列中获取一个任务来执行

ExecutorService executorService = Executors.newSingleThreadExecutor();

【源码】

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

使用线程池的线程

使用线程池的submit方法和execute方法都可以使用线程池的线程来执行方法

  • submit方法有返回值

  • execute方法没有返回值

下面的代码使用了上述的Runnable01类和Callable01类

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @Author dam
 * @create 2024/7/14 18:23
 */
public class InitiativeTest {
    static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("开启第一个线程干活");
        Runnable01 runnable01 = new Runnable01();
        service.submit(runnable01);
        Thread.sleep(1000);
        System.out.println();

        System.out.println("开启第二个线程干活");
        // 第二个参数会传给Future,线程的任务执行之后,可以通过get方法获得
        Future<Integer> submit1 = service.submit(runnable01, 2);
        System.out.println("submit1:" + submit1.get());
        Thread.sleep(1000);
        System.out.println();

        System.out.println("开启第三个线程干活");
        Future<Integer> submit2 = service.submit(new Callable01());
        // 使用get可以获得Callable01实现call方法时返回的解雇
        System.out.println("submit2:" + submit2.get());
        Thread.sleep(1000);
        System.out.println();

        System.out.println("开启第四个线程干活");
        service.execute(runnable01);
    }
}

上面的异步任务都运行完成之后,程序也没有停止,因为线程池还没有销毁

使用线程池优点

上述三种直接创建线程的问题:如果一个任务过来就创建一个线程,高并发的时候,会直接资源耗尽,导致系统崩溃。使用线程池有如下优点:

  • 降低资源的消耗:通过重复利用已经创建好的线程,可以降低线程的创建和销毁带来的损耗
  • 提高响应速度:因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
  • 提高线程的可管理性:线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,因此可以使用线程池进行统一管理。管理包括关闭某个线程池、关闭某个线程,可以在服务器压力大的时候关掉一些无关紧要的线程,等服务器空闲之后再重新开启

总结

  • 1、2不能得到返回值。3可以获得返回值
  • 1、2、3都不能控制资源(每来一个项目就新招一个员工来干活)
  • 4可以控制资源(只有那么多员工,一个员工手上空闲了才做另一件事)

CPU能同时运行多少线程

我的CPU是24核32线程的,是不是我同时最多只能开32个线程来执行任务。为了验证这个,我将把线程池的核心线程数设置为一个大于32的数字,例如100。

CPU编号从0开始,所以最后一个CPU是31

1、实现Runnable接口,重写run方法,让每个线程执行100秒才结束

public class Runnable01 implements Runnable {
    @Override
    public void run() {
        System.out.println("当前线程:" + Thread.currentThread().getName());
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

2、将线程池的核心线程设置为100,提交150个异步任务

public class ThreadTest {
    static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            100,
            200,
            10,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(100000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        for (int i = 0; i < 150; i++) {
            Runnable01 runnable01 = new Runnable01();
            executor.execute(runnable01);
        }
    }
}

推论

只能有32个线程开始执行,因为CPU是32线程的

运行

竟然100个线程都开始执行了,这是为什么?

原因

在多核处理器上,操作系统和硬件架构允许创建的线程数量通常远超过物理核心的数量。这是因为现代CPU支持超线程技术,它可以让每个物理核心同时处理多个线程。虽然CPU只有24个物理核心,但因为超线程技术,有32个逻辑处理器。

然而,即使有超线程,创建的线程数量并不严格受限于逻辑处理器的数量。操作系统和硬件会通过调度机制来管理这些线程,确保它们能够被合理地分配到可用的计算资源上。这意味着,尽管你可能有100个线程同时运行,但实际在一个给定的时间点上,只有最多32个线程可以在逻辑处理器上执行。其余的线程将处于等待状态,直到轮到它们执行。

因此,设置比物理或逻辑核心数更多的线程可能会导致更频繁的上下文切换,增加调度开销,并不一定能提高程序的性能。理想的核心线程数应该基于应用程序的工作负载特性、I/O操作的频率以及具体的硬件配置来决定


文章来源:https://blog.csdn.net/laodanqiu/article/details/140423435
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云