Java BlockingQueue 示例
今天我们将研究 Java BlockingQueue。java.util.concurrent.BlockingQueue
它是一个 Java 队列,支持在检索和删除元素时等待队列变为非空的操作,以及在添加元素时等待队列中的空间可用操作。
Java BlockingQueue
Java BlockingQueue 不接受null
值,NullPointerException
如果您尝试在队列中存储空值,则会抛出异常。Java BlockingQueue 实现是线程安全的。所有排队方法本质上都是原子的,并使用内部锁或其他形式的并发控制。Java BlockingQueue 接口是 Java 集合框架的一部分,主要用于实现生产者消费者问题。我们不必担心等待 BlockingQueue 中生产者有可用空间或消费者有可用对象,因为它由 BlockingQueue 的实现类处理。Java 提供了几个 BlockingQueue 实现,例如、、ArrayBlockingQueue
等。在 BlockingQueue 中实现生产者消费者问题时,我们将使用 ArrayBlockingQueue 实现。以下是您应该知道的一些重要方法。LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue
put(E e)
:此方法用于将元素插入队列。如果队列已满,则等待有空间可用。E take()
:此方法检索并从队列头部删除元素。如果队列为空,则等待元素可用。
现在让我们使用 java BlockingQueue 实现生产者消费者问题。
Java BlockingQueue 示例 - 消息
只是一个由生产者生成并添加到队列的普通 Java 对象。您也可以将其称为有效负载或队列消息。
package com.journaldev.concurrency;
public class Message {
private String msg;
public Message(String str){
this.msg=str;
}
public String getMsg() {
return msg;
}
}
Java BlockingQueue 示例 - 生产者
生产者类将创建消息并将其放入队列。
package com.journaldev.concurrency;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue<Message> queue;
public Producer(BlockingQueue<Message> q){
this.queue=q;
}
@Override
public void run() {
//produce messages
for(int i=0; i<100; i++){
Message msg = new Message(""+i);
try {
Thread.sleep(i);
queue.put(msg);
System.out.println("Produced "+msg.getMsg());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//adding exit message
Message msg = new Message("exit");
try {
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Java BlockingQueue 示例 - 消费者
消费者类将处理来自队列的消息,并在收到退出消息时终止。
package com.journaldev.concurrency;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<Message> queue;
public Consumer(BlockingQueue<Message> q){
this.queue=q;
}
@Override
public void run() {
try{
Message msg;
//consuming messages until exit message is received
while((msg = queue.take()).getMsg() !="exit"){
Thread.sleep(10);
System.out.println("Consumed "+msg.getMsg());
}
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
Java BlockingQueue 示例 - 服务
最后,我们必须为生产者和消费者创建 BlockingQueue 服务。此生产者消费者服务将创建固定大小的 BlockingQueue,并与生产者和消费者共享。此服务将启动生产者和消费者线程并退出。
package com.journaldev.concurrency;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerService {
public static void main(String[] args) {
//Creating BlockingQueue of size 10
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//starting producer to produce messages in queue
new Thread(producer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}
}
上述 java BlockingQueue 示例程序的输出如下所示。
Producer and Consumer has been started
Produced 0
Produced 1
Produced 2
Produced 3
Produced 4
Consumed 0
Produced 5
Consumed 1
Produced 6
Produced 7
Consumed 2
Produced 8
...
生产者和消费者使用Java线程睡眠来延迟地生产和消费消息。