美团面试题-Java 生产者消费者模式
请使用Java编写一个程序,实现生产者消费者模式。该程序应包含一个生产者线程和一个消费者线程, 它们共享一个固定大小的缓冲区。生产者线程负责向缓冲区中添加数据(生产数据),而消费者线程负责从缓冲区中取出数据(消费数据)。
要求:
缓冲区的大小固定为10。
生产者线程在缓冲区满时必须等待,直到有空间可用。
消费者线程在缓冲区空时必须等待,直到有数据可用。
使用Java的synchronized关键字和wait()、notify()、notifyAll()方法来实现线程间的同步。
生产者和消费者线程应交替工作,即生产者生产一个数据后,消费者消费一个数据,如此往复。
请编写一个完整的Java程序,包含生产者类、消费者类和缓冲区类,并展示如何创建和启动这些线程。
单生产者单消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class Main {
public static void main(String[] args) {
Buffer buffer = new Buffer();
Producer producer = new Producer(buffer);
Consumer consumer = new Consumer(buffer);
producer.start();
consumer.start();
}
}
class Buffer {
public final LinkedList<Integer> queue = new LinkedList<>();
public final int MAX_SIZE = 10;
}
class Producer extends Thread {
private final Buffer buffer;
private int value = 1;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
synchronized (buffer) {
// 当缓冲区满时等待
while (buffer.queue.size() >= buffer.MAX_SIZE) {
try {
buffer.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 生产一个数据
buffer.queue.add(value);
System.out.println("生产者生产了: " + value + ",当前缓冲区大小: " + buffer.queue.size());
value++;
buffer.notifyAll(); // 通知消费者线程
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
class Consumer extends Thread {
private final Buffer buffer;
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
synchronized (buffer) {
// 当缓冲区空时等待
while (buffer.queue.isEmpty()) {
try {
buffer.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 消费一个数据
int value = buffer.queue.removeFirst();
System.out.println("消费者消费了: " + value + ",当前缓冲区大小: " + buffer.queue.size());
buffer.notifyAll(); // 通知生产者线程
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
多生产者多消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class Main {
public static void main(String[] args) {
Buffer buffer = new Buffer();
// 创建多个生产者和消费者线程
for (int i = 1; i <= 2; i++) {
new Producer(buffer, i).start();
}
for (int i = 1; i <= 2; i++) {
new Consumer(buffer, i).start();
}
}
}
class Buffer {
public final LinkedList<Integer> queue = new LinkedList<>();
public final int MAX_SIZE = 10;
// 控制交替执行
public boolean isProducerTurn = true;
}
class Producer extends Thread {
private final Buffer buffer;
private static int counter = 0; // 所有生产者共享
private final int id;
public Producer(Buffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
synchronized (buffer) {
while (!buffer.isProducerTurn || buffer.queue.size() == buffer.MAX_SIZE) {
try {
buffer.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
int value = counter++;
buffer.queue.add(value);
System.out.println("生产者[" + id + "] 生产了: " + value + ",缓冲区大小: " + buffer.queue.size());
buffer.isProducerTurn = false;
buffer.notifyAll();
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
class Consumer extends Thread {
private final Buffer buffer;
private final int id;
public Consumer(Buffer buffer, int id) {
this.buffer = buffer;
this.id = id;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
synchronized (buffer) {
while (buffer.isProducerTurn || buffer.queue.isEmpty()) {
try {
buffer.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
int value = buffer.queue.removeFirst();
System.out.println("消费者[" + id + "] 消费了: " + value + ",缓冲区大小: " + buffer.queue.size());
buffer.isProducerTurn = true;
buffer.notifyAll();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
CC BY-NC-SA 4.0
许可协议,转载请注明出处!
本博客所有文章除特别声明外,均采用