美团面试题-Java 生产者消费者模式

请使用Java编写一个程序,实现生产者消费者模式。该程序应包含一个生产者线程和一个消费者线程, 它们共享一个固定大小的缓冲区。生产者线程负责向缓冲区中添加数据(生产数据),而消费者线程负责从缓冲区中取出数据(消费数据)。

要求:

  1. 缓冲区的大小固定为10。

  2. 生产者线程在缓冲区满时必须等待,直到有空间可用。

  3. 消费者线程在缓冲区空时必须等待,直到有数据可用。

  4. 使用Java的synchronized关键字和wait()、notify()、notifyAll()方法来实现线程间的同步。

  5. 生产者和消费者线程应交替工作,即生产者生产一个数据后,消费者消费一个数据,如此往复。

  6. 请编写一个完整的Java程序,包含生产者类、消费者类和缓冲区类,并展示如何创建和启动这些线程。

单生产者单消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class Main {
    public static void main(String[] args) {
        Buffer buffer = new Buffer();

        Producer producer = new Producer(buffer);
        Consumer consumer = new Consumer(buffer);

        producer.start();
        consumer.start();
    }
}

class Buffer {
    public final LinkedList<Integer> queue = new LinkedList<>();
    public final int MAX_SIZE = 10;
}

class Producer extends Thread {
    private final Buffer buffer;
    private int value = 1;

    public Producer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            synchronized (buffer) {
                // 当缓冲区满时等待
                while (buffer.queue.size() >= buffer.MAX_SIZE) {
                    try {
                        buffer.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                // 生产一个数据
                buffer.queue.add(value);
                System.out.println("生产者生产了: " + value + ",当前缓冲区大小: " + buffer.queue.size());
                value++;

                buffer.notifyAll(); // 通知消费者线程
            }

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

class Consumer extends Thread {
    private final Buffer buffer;

    public Consumer(Buffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            synchronized (buffer) {
                // 当缓冲区空时等待
                while (buffer.queue.isEmpty()) {
                    try {
                        buffer.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                // 消费一个数据
                int value = buffer.queue.removeFirst();
                System.out.println("消费者消费了: " + value + ",当前缓冲区大小: " + buffer.queue.size());

                buffer.notifyAll(); // 通知生产者线程
            }

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

多生产者多消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class Main {
    public static void main(String[] args) {
        Buffer buffer = new Buffer();

        // 创建多个生产者和消费者线程
        for (int i = 1; i <= 2; i++) {
            new Producer(buffer, i).start();
        }

        for (int i = 1; i <= 2; i++) {
            new Consumer(buffer, i).start();
        }
    }
}

class Buffer {
    public final LinkedList<Integer> queue = new LinkedList<>();
    public final int MAX_SIZE = 10;

    // 控制交替执行
    public boolean isProducerTurn = true;
}

class Producer extends Thread {
    private final Buffer buffer;
    private static int counter = 0; // 所有生产者共享
    private final int id;

    public Producer(Buffer buffer, int id) {
        this.buffer = buffer;
        this.id = id;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            synchronized (buffer) {
                while (!buffer.isProducerTurn || buffer.queue.size() == buffer.MAX_SIZE) {
                    try {
                        buffer.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                int value = counter++;
                buffer.queue.add(value);
                System.out.println("生产者[" + id + "] 生产了: " + value + ",缓冲区大小: " + buffer.queue.size());

                buffer.isProducerTurn = false;
                buffer.notifyAll();
            }

            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

class Consumer extends Thread {
    private final Buffer buffer;
    private final int id;

    public Consumer(Buffer buffer, int id) {
        this.buffer = buffer;
        this.id = id;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            synchronized (buffer) {
                while (buffer.isProducerTurn || buffer.queue.isEmpty()) {
                    try {
                        buffer.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                int value = buffer.queue.removeFirst();
                System.out.println("消费者[" + id + "] 消费了: " + value + ",缓冲区大小: " + buffer.queue.size());

                buffer.isProducerTurn = true;
                buffer.notifyAll();
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
本文作者:
本文链接: https://hgnulb.github.io/blog/2025/25314462
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处!