Java中ReentrantLock应用

分类:N07_Java

标签:

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 使用 ReentrantLock + Condition 实现生产者-消费者模型
 * 通过两个 Condition 分别管理“缓冲区不满”和“缓冲区不空”的条件,避免无效唤醒
 */
public class ReentrantLockDemo {
    private static final int CAPACITY = 5;
    private static final LinkedList<Integer> buffer = new LinkedList<>();
    private static final ReentrantLock lock = new ReentrantLock();
    // 两个条件队列
    private static final Condition notFull = lock.newCondition();  // 缓冲区不满的条件
    private static final Condition notEmpty = lock.newCondition(); // 缓冲区不空的条件

    public static void produce() throws InterruptedException {
        int value = 0;
        while (true) {
            lock.lock();
            try {
                while (buffer.size() == CAPACITY) {
                    System.out.println(Thread.currentThread().getName() + " 缓冲区满,进入 notFull 等待队列...");
                    notFull.await();   // 释放锁并进入 notFull 的条件队列
                }
                buffer.add(value);
                System.out.println(Thread.currentThread().getName() + " 生产: " + value + ",缓冲区大小: " + buffer.size());
                value++;
                // 唤醒一个在 notEmpty 条件队列中等待的线程(消费者)
                notEmpty.signal();
                Thread.sleep(500);
            } finally {
                lock.unlock();
            }
        }
    }

    public static void consume() throws InterruptedException {
        while (true) {
            lock.lock();
            try {
                while (buffer.isEmpty()) {
                    System.out.println(Thread.currentThread().getName() + " 缓冲区空,进入 notEmpty 等待队列...");
                    notEmpty.await();  // 释放锁并进入 notEmpty 的条件队列
                }
                int data = buffer.removeFirst();
                System.out.println(Thread.currentThread().getName() + " 消费: " + data + ",缓冲区大小: " + buffer.size());
                // 唤醒一个在 notFull 条件队列中等待的线程(生产者)
                notFull.signal();
                Thread.sleep(1000);
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        Thread producer1 = new Thread(() -> {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "生产者A");
        Thread producer2 = new Thread(() -> {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "生产者B");
        Thread consumer1 = new Thread(() -> {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "消费者A");
        Thread consumer2 = new Thread(() -> {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "消费者B");

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
    }
}


修改内容