生产者消费者模型

概念

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力

这个阻塞队列就是用来给生产者和消费者解耦的。

如果缓冲区已经满了,则生产者线程阻塞;

如果缓冲区为空,那么消费者线程阻塞。

2.4 为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

优点

  1. 解耦

假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。

  1. 支持并发(concurrency)

生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。

使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。

  1. 支持忙闲不均

缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

线程池与生产消费者模式

Java中的线程池类其实就是一种生产者和消费者模式的实现方式,但是我觉得其实现方式更加高明。生产者把任务丢给线程池,线程池创建线程并处理任务,如果将要运行的任务数大于线程池的基本线程数就把任务扔到阻塞

队列里,这种做法比只使用一个阻塞队列来实现生产者和消费者模式显然要高明很多,因为消费者能够处理直接就处理掉了,这样速度更快,而生产者先存,消费者再取这种方式显然慢一些。

3 实现生产者消费者的三种模式

3.1 synchronized、wait和notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
//wait 和 notify
public class ProducerConsumerWithWaitNofity {
public static void main(String[] args) {
Resource resource = new Resource();
//生产者线程
ProducerThread p1 = new ProducerThread(resource);
ProducerThread p2 = new ProducerThread(resource);
ProducerThread p3 = new ProducerThread(resource);
//消费者线程
ConsumerThread c1 = new ConsumerThread(resource);
//ConsumerThread c2 = new ConsumerThread(resource);
//ConsumerThread c3 = new ConsumerThread(resource);

p1.start();
p2.start();
p3.start();
c1.start();
//c2.start();
//c3.start();
}
}
/**
* 公共资源类
* @author
*
*/
class Resource{//重要
//当前资源数量
private int num = 0;
//资源池中允许存放的资源数目
private int size = 10;

/**
* 从资源池中取走资源
*/
public synchronized void remove(){
if(num > 0){
num--;
System.out.println("消费者" + Thread.currentThread().getName() +
"消耗一件资源," + "当前线程池有" + num + "个");
notifyAll();//通知生产者生产资源
}else{
try {
//如果没有资源,则消费者进入等待状态
wait();
System.out.println("消费者" + Thread.currentThread().getName() + "线程进入等待状态");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 向资源池中添加资源
*/
public synchronized void add(){
if(num < size){
num++;
System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有"
+ num + "个");
//通知等待的消费者
notifyAll();
}else{
//如果当前资源池中有10件资源
try{
wait();//生产者进入等待状态,并释放锁
System.out.println(Thread.currentThread().getName()+"线程进入等待");
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
/**
* 消费者线程
*/
class ConsumerThread extends Thread{
private Resource resource;
public ConsumerThread(Resource resource){
this.resource = resource;
}
@Override
public void run() {
while(true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.remove();
}
}
}
/**
* 生产者线程
*/
class ProducerThread extends Thread{
private Resource resource;
public ProducerThread(Resource resource){
this.resource = resource;
}
@Override
public void run() {
//不断地生产资源
while(true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.add();
}
}

}

3.2 lock和condition的await、signalAll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 使用Lock 和 Condition解决生产者消费者问题
* @author tangzhijing
*
*/
public class LockCondition {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition producerCondition = lock.newCondition();
Condition consumerCondition = lock.newCondition();
Resource2 resource = new Resource2(lock,producerCondition,consumerCondition);

//生产者线程
ProducerThread2 producer1 = new ProducerThread2(resource);

//消费者线程
ConsumerThread2 consumer1 = new ConsumerThread2(resource);
ConsumerThread2 consumer2 = new ConsumerThread2(resource);
ConsumerThread2 consumer3 = new ConsumerThread2(resource);

producer1.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
/**
* 消费者线程
*/
class ConsumerThread2 extends Thread{
private Resource2 resource;
public ConsumerThread2(Resource2 resource){
this.resource = resource;
//setName("消费者");
}
public void run(){
while(true){
try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.remove();
}
}
}
/**
* 生产者线程
* @author tangzhijing
*
*/
class ProducerThread2 extends Thread{
private Resource2 resource;
public ProducerThread2(Resource2 resource){
this.resource = resource;
setName("生产者");
}
public void run(){
while(true){
try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.add();
}
}
}
/**
* 公共资源类
* @author tangzhijing
*
*/
class Resource2{
private int num = 0;//当前资源数量
private int size = 10;//资源池中允许存放的资源数目
private Lock lock;
private Condition producerCondition;
private Condition consumerCondition;
public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) {
this.lock = lock;
this.producerCondition = producerCondition;
this.consumerCondition = consumerCondition;

}
/**
* 向资源池中添加资源
*/
public void add(){
lock.lock();
try{
if(num < size){
num++;
System.out.println(Thread.currentThread().getName() +
"生产一件资源,当前资源池有" + num + "个");
//唤醒等待的消费者
consumerCondition.signalAll();
}else{
//让生产者线程等待
try {
producerCondition.await();
System.out.println(Thread.currentThread().getName() + "线程进入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally{
lock.unlock();
}
}
/**
* 从资源池中取走资源
*/
public void remove(){
lock.lock();
try{
if(num > 0){
num--;
System.out.println("消费者" + Thread.currentThread().getName()
+ "消耗一件资源," + "当前资源池有" + num + "个");
producerCondition.signalAll();//唤醒等待的生产者
}else{
try {
consumerCondition.await();
System.out.println(Thread.currentThread().getName() + "线程进入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}//让消费者等待
}
}finally{
lock.unlock();
}
}

}

3.3 使用阻塞队列实现的版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

//使用阻塞队列BlockingQueue解决生产者消费者
public class BlockingQueueConsumerProducer {
public static void main(String[] args) {
Resource3 resource = new Resource3();
//生产者线程
ProducerThread3 p = new ProducerThread3(resource);
//多个消费者
ConsumerThread3 c1 = new ConsumerThread3(resource);
ConsumerThread3 c2 = new ConsumerThread3(resource);
ConsumerThread3 c3 = new ConsumerThread3(resource);

p.start();
c1.start();
c2.start();
c3.start();
}
}
/**
* 消费者线程
* @author tangzhijing
*
*/
class ConsumerThread3 extends Thread {
private Resource3 resource3;

public ConsumerThread3(Resource3 resource) {
this.resource3 = resource;
//setName("消费者");
}

public void run() {
while (true) {
try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource3.remove();
}
}
}
/**
* 生产者线程
* @author tangzhijing
*
*/
class ProducerThread3 extends Thread{
private Resource3 resource3;
public ProducerThread3(Resource3 resource) {
this.resource3 = resource;
//setName("生产者");
}

public void run() {
while (true) {
try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource3.add();
}
}
}
class Resource3{
private BlockingQueue resourceQueue = new LinkedBlockingQueue(10);
/**
* 向资源池中添加资源
*/
public void add(){
try {
resourceQueue.put(1);
System.out.println("生产者" + Thread.currentThread().getName()
+ "生产一件资源," + "当前资源池有" + resourceQueue.size() +
"个资源");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 向资源池中移除资源
*/
public void remove(){
try {
resourceQueue.take();
System.out.println("消费者" + Thread.currentThread().getName() +
"消耗一件资源," + "当前资源池有" + resourceQueue.size()
+ "个资源");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}