首页 > 基础资料 博客日记

Java 并发集合:阻塞队列集合介绍

2024-08-11 21:00:06基础资料围观275

Java资料网推荐Java 并发集合:阻塞队列集合介绍这篇文章给大家,欢迎收藏Java资料网享受知识的乐趣

大家好,我是栗筝i,这篇文章是我的 “栗筝i 的 Java 技术栈” 专栏的第 028 篇文章,在 “栗筝i 的 Java 技术栈” 这个专栏中我会持续为大家更新 Java 技术相关全套技术栈内容。专栏的主要目标是已经有一定 Java 开发经验,并希望进一步完善自己对整个 Java 技术体系来充实自己的技术栈的同学。与此同时,本专栏的所有文章,也都会准备充足的代码示例和完善的知识点梳理,因此也十分适合零基础的小白和要准备工作面试的同学学习。当然,我也会在必要的时候进行相关技术深度的技术解读,相信即使是拥有多年 Java 开发经验的从业者和大佬们也会有所收获并找到乐趣。

在多线程编程中,如何有效地进行线程间通信和协调是一个关键问题。Java 并发包中的阻塞队列集合(BlockingQueue)为开发者提供了强大的工具,能够简化线程同步与数据共享的复杂性。阻塞队列不仅能够在生产者和消费者之间进行线程安全的数据传递,还通过自动的阻塞和唤醒机制,帮助我们轻松实现高效的生产者-消费者模型。本篇文章将详细介绍 Java 中几种常用的阻塞队列集合,分析它们的特点、应用场景及实现原理,帮助您更好地理解并掌握这些并发工具。



1、Java 阻塞队列的介绍

1.1、Java 阻塞队列概述

Java 中的阻塞队列(BlockingQueue)是一种在多线程环境下用于线程安全的数据结构,它不仅提供了典型的队列操作(如插入和移除),还可以在队列为空或满时自动阻塞操作线程,直到队列状态允许操作的继续。阻塞队列通过阻塞和等待机制有效地协调生产者和消费者线程之间的操作,确保数据一致性和线程安全。

以下是其主要功能和应用场景:

  • 线程间通信:阻塞队列在生产者-消费者模型中扮演了关键角色,它允许生产者线程和消费者线程之间进行线程安全的数据传递。具体表现为:

    • 生产者线程:生产者线程将数据放入队列中。如果队列已满,生产者线程将被阻塞,直到队列有空闲空间。

    • 消费者线程:消费者线程从队列中取出数据。如果队列为空,消费者线程将被阻塞,直到有数据可供消费。

  • 流量控制:通过阻塞机制,阻塞队列可以有效地控制生产者和消费者的工作节奏,避免过载和资源浪费。当队列达到容量上限时,阻塞队列会自动阻止进一步的插入操作,直到有空间可用,从而避免过载和资源浪费。

  • 简化并发编程:阻塞队列封装了复杂的同步机制,简化了多线程环境下的数据共享和线程协调,使得开发者可以专注于业务逻辑,而不必担心线程安全问题。

1.2、Java 阻塞队列接口

BlockingQueue 是 Java 并发包(java.util.concurrent)中的一个接口,继承自 Queue 接口。它提供了额外的阻塞操作,例如在队列为空时等待元素变得可用,或在队列已满时等待空间变得可用。

BlockingQueue 阻塞队列在 Java 中的主要实现有三个:

  1. ArrayBlockingQueue: 基于数组实现的有界阻塞队列,必须指定固定容量,支持可选的公平性策略。
  2. LinkedBlockingQueue: 基于链表实现的阻塞队列,默认无界或指定容量,有较高的插入和删除性能。
  3. SynchronousQueue: 一个没有内部容量的队列,每个插入操作必须等待一个对应的删除操作,反之亦然,适用于直接交换数据的场景。
1.3、Java 阻塞队列与非阻塞队列

阻塞队列和非阻塞队列的区别:

操作方式

  • 阻塞队列:阻塞队列在插入或移除操作无法立即执行时,线程会被阻塞,直到操作可以继续。例如,当队列已满时,插入操作会被阻塞;当队列为空时,移除操作会被阻塞。
  • 非阻塞队列:非阻塞队列在插入或移除元素时不会阻塞线程。如果队列满了,插入操作可能会立即失败;如果队列为空,移除操作可能会立即返回空值或失败

线程安全性

  • 阻塞队列:阻塞操作通常通过同步机制来实现,比如使用锁或条件变量。在 ArrayBlockingQueueLinkedBlockingQueue 中,生产者线程会在队列满时进入等待状态,直到消费者线程移除元素,释放出空间。
  • 非阻塞队列:非阻塞队列通常使用无锁算法(例如 CAS 操作)来实现线程安全,这样即使在高并发情况下,操作也不会造成线程阻塞。

适用场景

  • 阻塞队列:生产者-消费者模式 – 在生产者和消费者线程之间传递数据,生产者插入数据到队列,消费者从队列中移除数据。阻塞队列可以有效地平衡生产者和消费者的速度,避免出现生产者过快导致队列溢出或消费者过慢导致队列空转的问题。
  • 非阻塞队列:高并发场景 – 在高并发场景下,非阻塞队列由于避免了锁的使用,性能更高,更适合需要高吞吐量和低延迟的应用。

2、Java 阻塞队列的具体实现

我们这里以 ArrayBlockingQueue 为例,来看 Java 对于阻塞队列的具体实现。

2.1、数据结构

ArrayBlockingQueue 类使用一个数组 items 存储队列元素,并通过 takeIndexputIndex 字段来跟踪下一个取出和放入元素的索引,同时用 count 记录当前队列中的元素数量。为了确保线程安全,ArrayBlockingQueue 使用 ReentrantLock 作为主要锁,配合 Condition 对象 notEmptynotFull 分别用于管理线程在队列为空或已满时的等待和通知。itrs 字段用于维护当前活动的迭代器的状态,允许队列操作在进行元素添加或移除时保持迭代器的一致性。如果没有活动的迭代器,itrs 将为 null

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable 
{

		// 省略其他方法和实现细节
    ...

    /** 队列中的元素 */
    @SuppressWarnings("serial") // 有条件序列化
    final Object[] items;

    /** 下一个 take、poll、peek 或 remove 操作的元素索引 */
    int takeIndex;

    /** 下一个 put、offer 或 add 操作的元素索引 */
    int putIndex;

    /** 队列中的元素数量 */
    int count;

    /*
     * 并发控制使用经典的双条件算法,
     * 这是任何教科书中都可以找到的。
     */

    /** 保护所有访问的主要锁 */
    final ReentrantLock lock;

    /** 用于等待取元素的条件 */
    @SuppressWarnings("serial")  // 实现 Condition 的类可能是可序列化的。
    private final Condition notEmpty;

    /** 用于等待放入元素的条件 */
    @SuppressWarnings("serial")  // 实现 Condition 的类可能是可序列化的。
    private final Condition notFull;

    /**
     * 当前活动迭代器的共享状态,如果没有已知的迭代器,则为 null。
     * 允许队列操作更新迭代器状态。
     */
    transient Itrs itrs;
  
    // 省略其他方法和实现细节
    ...
}

关于 ConditionCondition 是 Java 并发库中的一个接口,它提供了一种线程间通信机制,使得线程能够在特定条件下等待和通知。Condition 的主要作用是为线程提供等待和通知机制,以便协调和管理线程的执行顺序。

Condition 的主要功能:

  • 等待(Wait):线程可以在 Condition 上调用 await() 方法进入等待状态,直到其他线程发出通知(即 signal()signalAll());await() 方法使当前线程释放持有的锁,并进入等待队列,直到条件满足或被中断。此时线程会自动重新获取锁。
  • 通知(Signal):通过 Conditionsignal() 方法,线程可以通知一个等待在该 Condition 上的线程,使其从等待状态中恢复并重新获取锁;使用 signalAll() 方法可以通知所有在该 Condition 上等待的线程,唤醒它们。
  • 超时等待:await(long time, TimeUnit unit) 方法允许线程在等待时指定超时时间。如果超时,线程会自动从等待状态恢复,并且不需要手动调用 signal()

Condition 和直接调用线程的方式相比:Condition 提供了更高级、更灵活的线程协调机制,能够与 ReentrantLock 配合使用,适合处理复杂的并发控制和条件等待需求。它支持多条件和超时等待等高级功能。

2.2、插入操作

ArrayBlockingQueue 通过 ReentrantLockCondition 实现线程安全的元素插入操作。offer(E e) 尝试将元素 e 插入队列,如果队列未满则成功插入,否则返回 falseput(E e) 在队列已满时阻塞线程直到有空间,并插入元素。offer(E e, long timeout, TimeUnit unit) 在指定超时时间内尝试插入元素,超时后返回 falseenqueue(E e) 将元素插入队列的尾部,更新索引,增加元素计数,并通知等待取元素的线程。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable 
{

    // 省略其他方法和实现细节
    ...

    /**
     * 尝试将指定元素插入队列的尾部,如果队列容量未满则立即成功插入,
     * 插入成功时返回 {@code true},如果队列已满则返回 {@code false}。
     * 这个方法通常比 {@link #add} 方法更可取,因为后者在插入失败时会抛出异常。
     *
     * @throws NullPointerException 如果指定的元素为 null
     */
    public boolean offer(E e) {
        Objects.requireNonNull(e); // 检查元素是否为 null,如果为 null 抛出异常
        final ReentrantLock lock = this.lock; // 获取锁对象
        lock.lock(); // 获取锁
        try {
            if (count == items.length) // 如果队列已满
                return false; // 插入失败,返回 false
            else {
                enqueue(e); // 调用 enqueue 方法插入元素
                return true; // 插入成功,返回 true
            }
        } finally {
            lock.unlock(); // 释放锁
        }
    }

    /**
     * 将指定元素插入队列的尾部,如果队列已满则阻塞当前线程直到有空间可用。
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        Objects.requireNonNull(e); // 检查元素是否为 null,如果为 null 抛出异常
        final ReentrantLock lock = this.lock; // 获取锁对象
        lock.lockInterruptibly(); // 获取锁,允许中断
        try {
            while (count == items.length) // 如果队列已满
                notFull.await(); // 等待直到队列有空间
            enqueue(e); // 调用 enqueue 方法插入元素
        } finally {
            lock.unlock(); // 释放锁
        }
    }

    /**
     * 将指定元素插入队列的尾部,如果队列已满,则在指定的等待时间内等待空间可用。
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        Objects.requireNonNull(e); // 检查元素是否为 null,如果为 null 抛出异常
        long nanos = unit.toNanos(timeout); // 将超时时间转换为纳秒
        final ReentrantLock lock = this.lock; // 获取锁对象
        lock.lockInterruptibly(); // 获取锁,允许中断
        try {
            while (count == items.length) { // 如果队列已满
                if (nanos <= 0L) // 如果超时时间已过
                    return false; // 返回 false 表示插入失败
                nanos = notFull.awaitNanos(nanos); // 等待指定的时间,直到队列有空间
            }
            enqueue(e); // 调用 enqueue 方法插入元素
            return true; // 插入成功,返回 true
        } finally {
            lock.unlock(); // 释放锁
        }
    }

    /**
     * 内部方法,将指定元素插入队列的尾部。
     */
    private void enqueue(E e) {
        // assert lock.isHeldByCurrentThread(); // 确保当前线程持有锁
        // assert lock.getHoldCount() == 1; // 确保锁的持有计数为 1
        // assert items[putIndex] == null; // 确保插入位置为空
        final Object[] items = this.items; // 获取存储元素的数组
        items[putIndex] = e; // 将元素放入数组的指定位置
        if (++putIndex == items.length) putIndex = 0; // 更新插入索引,若超出数组长度则重置为 0
        count++; // 增加队列中的元素数量
        notEmpty.signal(); // 唤醒等待取元素的线程
    }

    // 省略其他方法和实现细节
    ...
}

2.3、获取操作

ArrayBlockingQueue 提供了多种获取元素的方法:poll() 尝试移除并返回队列头部的元素,如果队列为空则返回 nulltake() 移除并返回队列头部的元素,如果队列为空则阻塞当前线程直到有元素可用;poll(long timeout, TimeUnit unit) 在指定的超时时间内尝试移除并返回头部元素,若超时则返回 nullpeek() 返回队列头部的元素但不移除它,如果队列为空则返回 null。这些方法通过 ReentrantLockCondition 实现线程安全的队列操作,确保在并发环境下对队列的正确访问和管理

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable 
{
    // 省略其他方法和实现细节
    ...

    /**
     * 从队列中移除并返回头部元素,如果队列为空则返回 null。
     *
     * @return 队列头部元素;如果队列为空则返回 null。
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 从队列中移除并返回头部元素,如果队列为空则阻塞当前线程,直到有元素可用。
     *
     * @return 队列头部元素。
     * @throws InterruptedException 如果线程在等待时被中断。
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); // 队列为空时,等待有元素可用
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 尝试从队列中移除并返回头部元素,在指定的超时时间内等待元素变得可用。
     *
     * @param timeout 等待的最大时间。
     * @param unit 时间单位。
     * @return 队列头部元素;如果在超时时间内未获得元素则返回 null。
     * @throws InterruptedException 如果线程在等待时被中断。
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout); // 将超时时间转换为纳秒
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0L)
                    return null; // 超时返回 null
                nanos = notEmpty.awaitNanos(nanos); // 等待指定的时间
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 返回队列头部的元素,但不移除它。如果队列为空,则返回 null。
     *
     * @return 队列头部元素;如果队列为空则返回 null。
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // 返回队列头部的元素;如果队列为空则返回 null
        } finally {
            lock.unlock();
        }
    }

    /**
     * 从当前的取元素位置提取元素,更新位置,并发出信号。
     * 仅在持有锁的情况下调用。
     *
     * @return 提取的元素。
     */
    private E dequeue() {
        // assert lock.isHeldByCurrentThread();
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E e = (E) items[takeIndex]; // 获取并强制转换元素
        items[takeIndex] = null; // 清除元素位置
        if (++takeIndex == items.length) takeIndex = 0; // 更新取出位置索引
        count--; // 减少队列中的元素数量
        if (itrs != null)
            itrs.elementDequeued(); // 更新迭代器状态(如果存在)
        notFull.signal(); // 通知生产者线程队列有空间
        return e;
    }
    
    // 省略其他方法和实现细节
    ...
}


3、Java 阻塞队列知识点拓展

3.1、ArrayBlockingQueue 和 LinkedBlockingQueue 之间的区别

ArrayBlockingQueueLinkedBlockingQueue 是 Java 中的两种阻塞队列,它们在实现和使用场景上有显著的区别:

数据结构

  • ArrayBlockingQueue:使用固定大小的数组实现;内部是一个环形缓冲区(Circular Buffer),容量在初始化时设定,不可动态调整。

  • LinkedBlockingQueue:使用链表实现;内部是一个双端链表,容量可以在初始化时设定,默认最大容量为 Integer.MAX_VALUE,并且可以动态扩展。

存储特性

  • ArrayBlockingQueue:内存占用固定,因为容量在创建时就已经确定;适合容量已知且稳定的场景。

  • LinkedBlockingQueue:内存占用根据实际存储的元素数量动态变化;适合容量不确定或需要动态调整的场景。

性能

  • ArrayBlockingQueue:插入和删除操作较为高效,因为是基于数组的索引操作;更好的缓存局部性,适合固定容量的高并发场景。

  • LinkedBlockingQueue:插入和删除操作涉及链表节点的操作,可能稍慢;适合动态容量变化的场景,内存使用灵活。

使用场景

  • ArrayBlockingQueue:用于容量固定的情况,如固定大小的线程池任务队列;内存占用稳定,性能预测性强。

  • LinkedBlockingQueue:用于需要大容量或容量可变的情况,如任务缓存队列;内存使用灵活,适应变化的工作负载。

总结:ArrayBlockingQueue 是基于固定大小数组的阻塞队列,适合固定容量的应用场景;而 LinkedBlockingQueue 是基于链表的阻塞队列,适合容量不确定的场景。二者在性能、内存占用和适用场景上各有特点。

3.2、关于 SynchronousQueue 的介绍

SynchronousQueue 是 Java 中的一种特殊的阻塞队列,其实现和行为与传统的阻塞队列有显著不同。以下是 SynchronousQueue 的大致实现和特点:

数据结构SynchronousQueue 不使用任何内部存储结构来保存元素。即,它不持有任何实际的元素。每个插入操作必须等待一个线程来执行移除操作,反之亦然。即,插入和移除操作是直接配对的。

操作特性

  • 插入操作 (put()offer()):当调用 put()offer() 方法插入元素时,当前线程会被阻塞,直到另一个线程调用 take()poll() 方法从队列中移除该元素。这种机制确保了每个插入操作都有一个对应的移除操作。

  • 移除操作 (take()poll()):当调用 take()poll() 方法移除元素时,当前线程会被阻塞,直到另一个线程调用 put()offer() 方法将元素插入队列。这种机制确保了每个移除操作都有一个对应的插入操作。

线程交互SynchronousQueue 实际上可以被视为一 “零容量” 队列,因为它不存储任何元素。插入和移除操作是完全同步的,必须在操作之间进行配对。

使用场景:常用于需要直接交换任务和线程的场景,如线程池的工作队列。线程池中的工作线程可以直接从队列中获取任务,而不需要额外的存储空间。

内部实现SynchronousQueue 使用一组条件变量来实现线程间的配对机制。使用 ReentrantLockCondition 对象来管理线程的等待和通知。

总的来说,SynchronousQueue 是一种特殊的阻塞队列,不存储元素,所有的插入操作都需要有相应的移除操作配对。它用于需要直接交换数据或任务的场景,如线程池的工作队列,其实现基于条件变量来管理线程的同步操作。



文章来源:https://blog.csdn.net/weixin_45187434/article/details/141033227
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云