分类:N07_Java
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 使用 ReentrantLock + Condition 实现生产者-消费者模型
* 通过两个 Condition 分别管理“缓冲区不满”和“缓冲区不空”的条件,避免无效唤醒
*/
public class ReentrantLockDemo {
private static final int CAPACITY = 5;
private static final LinkedList<Integer> buffer = new LinkedList<>();
private static final ReentrantLock lock = new ReentrantLock();
// 两个条件队列
private static final Condition notFull = lock.newCondition(); // 缓冲区不满的条件
private static final Condition notEmpty = lock.newCondition(); // 缓冲区不空的条件
public static void produce() throws InterruptedException {
int value = 0;
while (true) {
lock.lock();
try {
while (buffer.size() == CAPACITY) {
System.out.println(Thread.currentThread().getName() + " 缓冲区满,进入 notFull 等待队列...");
notFull.await(); // 释放锁并进入 notFull 的条件队列
}
buffer.add(value);
System.out.println(Thread.currentThread().getName() + " 生产: " + value + ",缓冲区大小: " + buffer.size());
value++;
// 唤醒一个在 notEmpty 条件队列中等待的线程(消费者)
notEmpty.signal();
Thread.sleep(500);
} finally {
lock.unlock();
}
}
}
public static void consume() throws InterruptedException {
while (true) {
lock.lock();
try {
while (buffer.isEmpty()) {
System.out.println(Thread.currentThread().getName() + " 缓冲区空,进入 notEmpty 等待队列...");
notEmpty.await(); // 释放锁并进入 notEmpty 的条件队列
}
int data = buffer.removeFirst();
System.out.println(Thread.currentThread().getName() + " 消费: " + data + ",缓冲区大小: " + buffer.size());
// 唤醒一个在 notFull 条件队列中等待的线程(生产者)
notFull.signal();
Thread.sleep(1000);
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
Thread producer1 = new Thread(() -> {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "生产者A");
Thread producer2 = new Thread(() -> {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "生产者B");
Thread consumer1 = new Thread(() -> {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "消费者A");
Thread consumer2 = new Thread(() -> {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "消费者B");
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
}
}