Java并发编程之阻塞队列(2)

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** 底层存储结构-数组 */ final Object[] items; /** 队首元素下标 */ int takeIndex; /** 队尾元素下标 */ int putIndex; /**队列元素总数 */ int count;
/** 重入锁 */ final ReentrantLock lock; /** notEmpty等待条件 */ private final Condition notEmpty; /** notFull等待条件 */ private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ transient Itrs itrs = null;

  可以看到,ArrayBlockingQueue用来存储元素的实际上是一个数组。

  再看下ArrayBlockingQueue两个重要方法的实现,put()和take():

public void put(E e) throws InterruptedException { //先检查e是否为空 checkNotNull(e); //获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //当队列已满,进入条件等待 while (count == items.length) notFull.await(); //队列不满,进行入队列操作 enqueue(e); } finally { //释放锁 lock.unlock(); } }

  再看下具体的入队操作:

private void enqueue(E x) { final Object[] items = this.items; //队尾入队 items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; //队列总数+1 count++; //notempty条件的等待集中随机选择一个线程,解除其阻塞状态 notEmpty.signal(); }

  下面是take()方法的源代码:

public E take() throws InterruptedException { //获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列为空 while (count == 0) //线程加入notEmpty条件等待集 notEmpty.await(); //非空,出队列 return dequeue(); } finally { //释放锁 lock.unlock(); } }

4、阻塞队列的应用:实现消费者-生产者模式

/** * @author 作者:徐剑 E-mail:anxu_2013@163.com * @version 创建时间:2016年3月20日 下午2:21:55 * 类说明:阻塞队列实现的消费者-生产者模式 */ public class Test { private int queueSize = 10; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume() { while (true) { try { queue.take(); System.out.println("从队列取走一个元素,队列剩余" + queue.size() + "个元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while (true) { try { queue.put(1); System.out.println("向队列取中插入一个元素,队列剩余空间:"+ (queueSize - queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/e21ccd316451edbebb47566773b6ab4d.html