JAVA

[자바] Producer-Consumer Problem, BlockingQueue

감자b 2024. 12. 25. 00:26

생산자-소비자 문제란?

여러 스레드가 동시에 데이터를 생산 및 소비할 때 생기는 동시성 문제

  • 생산자 : 데이터를 생산하는 역할 → 네트워크에서 데이터를 받아오거나, 파일을 읽는 스레드
  • 소비자 : 데이터를 소비하는 역할 → 얻어온 데이터를 처리, 저장하는 스레드
  • 버퍼 : 생산된 데이터가 일시적으로 저장되는 한정된 크기의 공간으로 데이터가 저장되면 소비자가 해당 버퍼에서 가져옴 (공유 자원)

이해를 위해 다음 코드를 살펴보자.

public class Buffet1 implements Buffet {
    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public Buffet1(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {
        if (queue.size() == max) {
            System.out.println("[put] 큐가 가득 참 - 버리는 데이터 = " + data);
            return;
        }
        queue.offer(data);
    }
    @Override
    public synchronized String take() {
        if (queue.isEmpty()) {
            return null;
        }
        return queue.poll();
    }
    @Override
    public synchronized String toString() {
        return queue.toString();
    }
}
public class ProducerConsumerMain {
    public static void main(String[] args) throws InterruptedException {
        Buffet buffet = new BuffetV1(1); // 버퍼
        chefFirst(buffet); // 생산자 먼저 실행
        //consumerFirst(buffet); // 소비자 먼저 실행
    }

    private static void chefFirst(Buffet buffet) throws InterruptedException {
        List<Thread> threads = new ArrayList<>();
        startChef(buffet, threads);
        startConsumer(buffet, threads);
        printAllState(buffet, threads);
    }

    private static void consumerFirst(Buffet buffet) throws InterruptedException {
        List<Thread> threads = new ArrayList<>();
        startConsumer(buffet, threads);
        startChef(buffet, threads);
        printAllState(buffet, threads);
    }

    private static void startChef(Buffet buffet, List<Thread> threads) throws InterruptedException {
        String[] meals = {"스파게티", "피자", "햄버거"};
        int count = 1;
        for (String meal : meals) {
            Thread chef = new Thread(new Chef(buffet, meal), "chef" + count++);
            threads.add(chef);
            chef.start();
            Thread.sleep(100);
        }
    }

    private static void startConsumer(Buffet buffet, List<Thread> threads) throws InterruptedException {
        for (int i = 1; i <= 3; i++) {
            Thread consumer = new Thread(new Consumer(buffet), "consumer" + i);
            threads.add(consumer);
            consumer.start();
            Thread.sleep(100);
        }
    }

    private static void printAllState(Buffet buffet, List<Thread> threads) {
        System.out.println("현재 큐 데이터: " + buffet);
        for (Thread thread : threads) {
            System.out.println(thread.getName() + "의 상태 " + thread.getState());
        }
    }
    static class Chef implements Runnable {
        private final Buffet queue;
        private final String request;

        public Chef(Buffet queue, String request) {
            this.queue = queue;
            this.request = request;
        }

        @Override
        public void run() {
            System.out.println("[요리사] 음식 제작 시작");
            queue.put(request);
            System.out.println("[요리사] " + request + " 음식 제작 완료, 대기 중인 음식" + queue);
        }
    }

    // 소비자
    static class Consumer implements Runnable {
        private final Buffet queue;

        public Consumer(Buffet queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            System.out.println("[소비자] 음식 가져오기");
            String take = queue.take();
            System.out.println("[소비자] " + take + " 음식 소비 완료");
        }
    }
}

출력 결과 - 생산자 우선

[요리사] 음식 제작 시작

[요리사] 스파게티 음식 제작 완료, 대기 중인 음식[스파게티]

[요리사] 음식 제작 시작

[put] 큐가 가득 참 - 버리는 음식 = 피자

[요리사] 피자 음식 제작 완료, 대기 중인 음식[스파게티]

[요리사] 음식 제작 시작

[put] 큐가 가득 참 - 버리는 음식 = 햄버거

[요리사] 햄버거 음식 제작 완료, 대기 중인 음식[스파게티]

[소비자] 음식 가져오기

[소비자] 스파게티 음식 소비 완료

[소비자] 음식 가져오기

[소비자] null 음식 소비 완료

[소비자] 음식 가져오기

[소비자] null 음식 소비 완료

현재 큐 데이터: []

chef1의 상태 TERMINATED

chef2의 상태 TERMINATED

chef3의 상태 TERMINATED

consumer1의 상태 TERMINATED

consumer2의 상태 TERMINATED

consumer3의 상태 TERMINATED

 

위 코드는 한 뷔페(버퍼)에 스파게티, 피자, 햄버거를 만드는 요리사(생산자)가 있고 이를 소비하는 소비자가 있음을 예로 들었다.

뷔페 내부의 큐는 임계 영역으로 synchronized 하여 동기화 하였다.

  • 생산자가 먼저 음식을 생산하는 경우
    • 요리사1 스레드가 모니터 락을 얻은 후 음식을 만들고 뷔페에 음식을 넣은 뒤 락을 반납 → 스파게티
    • 요리사2 역시 모니터 락을 얻은 후 음식 생성하는데 음식을 둘 곳이 없어 만든 음식을 버린 후 락을 반납.
    • 요리사3 역시 동일
    • 그 다음 소비자1 스레드 모니터 락을 얻고 뷔페에 음식(스파게티)을 가져온 뒤 락 반납.
    • 소비자2 역시 모니터 락을 얻고 음식을 가져오려는데 뷔페에 음식이 없고 있지도 않은 음식을 소비한 후 락 반납.
    • 소비자3 역시 동일
  • 소비자가 먼저 음식을 소비하는 경우
    • 소비자1은 모니터 락을 얻고 음식을 가져오려는데 뷔페에 음식이 없고 있지도 않은 음식을 소비한 후 락 반납.
    • 소비자2, 3 역시 동일
    • 요리사1은 모니터 락을 얻은 후 음식을 만들고 뷔페에 음식을 넣은 뒤 락을 반납. → 스파게티
    • 요리사2 역시 락을 얻은 후 음식 생성하는데 뷔페에 음식을 둘 곳이 없어 만든 음식을 버린 후 락을 반납.
    • 요리사3 역시 동일

그렇다면 이러한 문제를 해결하기 위해 요리사는 소비자가 음식을 가져가면 추가로 생산하도록 하고 소비자는 음식이 생길 때 대기한 후 가져가도록 변경해보자.

@Override
public synchronized void put(String data) {
	while (queue.size() == max) {
		try {
			System.out.println("[put] 큐가 가득 참 - 1초 대기 = " + data);
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			System.out.println("인터럽트 발생!");
		}
	}
	queue.offer(data);
}
@Override
public synchronized String take() {
	while (queue.isEmpty()) {
		try {
			System.out.println("큐에 음식이 없음 - 1초 대기");
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			System.out.println("인터럽트 발생!");
		}
	}
	return queue.poll();
}

출력 결과 - 생산자 우선

[요리사] 음식 제작 시작

[요리사] 스파게티 음식 제작 완료, 대기 중인 음식[스파게티]

[요리사] 음식 제작 시작

[put] 큐가 가득 참 - 1초 대기 = 피자

[요리사] 음식 제작 시작 [소비자]

음식 가져오기 [소비자]

음식 가져오기 [소비자]

음식 가져오기

[put] 큐가 가득 참 - 1초 대기 = 피자

[put] 큐가 가득 참 - 1초 대기 = 피자

...

 

위 코드와 같이 변경하면 무한 루프에 빠지게 된다.

  • 생산자가 먼저 음식을 생산하는 경우
    • 요리사1이 정상적으로 음식을 생산하고 큐에 넣은 뒤 락 반납.
    • 요리사2가 락을 얻고 음식을 생산하는데 뷔페가 가득찬 것을 보고 소비가 될 때 까지 1초 대기한다.
      • 이 때 RUNNABLE → TIMED_WAITING 상태가 되고 1초마다 RUNNABLE 상태로 돌아와서 음식이 소비되었는지 확인한다.
      • 락을 반납하지 않고 계속해서 확인
    • 그리고 요리사3이 락을 얻으려는데 요리사2가 가져가서 BLOCKED 상태로 대기한다.
    • 소비자1, 2, 3 역시 마찬가지로 BLOCKED 상태로 대기.

소비자들이 소비를 못하는데 요리사만 소비했는지 확인하는 문제가 발생함.

 

  • 소비자가 먼저 음식을 소비하는 경우
    • 소비자1이 락을 얻고 음식을 소비하려는데 음식이 없어 1초 대기
      • 이 때 RUNNABLE → TIMED_WAITING 상태가 되고 1초마다 RUNNABLE 상태로 돌아와서 음식이 생산되었는지 확인한다.
      • 락을 반납하지 않고 계속해서 확인
    • 소비자2, 3, 생산자1, 2, 3 모두 락이 없어 BLOCKED 상태로 대기

생산자가 음식을 생산하지 못하는데 소비자만 계속 생산되었는지 확인하는 문제가 발생

 

위 문제를 해결하려면 생산자는 음식이 생산되었음을 소비자에게 알리고, 소비자는 음식을 소비했음을 생산자에게 알려야 한다.

참고 : BLOCKED 상태 스레드들은 자바 내부의 락 대기 집합에서 관리가 된다.


wait(), notify()

위 문제 해결을 위해 Object 객체는 wait(), notify() 메서드를 지원한다.

  • wait()
    • 현재 스레드가 가지고 있는 락을 반납하며 WAITING 상태로 대기.
    • 현재 스레드가 synchronized 내부에서 락을 가지고 있을 때만 호출 가능
  • notify()
    • 대기 중인 스레드 중 하나를 깨우고(순서는 알 수 없음), 일어난 스레드는 락을 얻을 기회를 가짐
    • synchronized 내부에서 호출되어야 한다.
  • notifyAll()
    • notify()와 동일하지만 대기 중인 스레드 모두를 깨운 뒤 일어난 모든 스레드가 락을 얻을 기회를 가짐

그렇다면 wait(), notify()를 통해 위 문제를 해결해보자.

    @Override
    public synchronized void put(String data) {
        while (queue.size() == max) {
            try {
                System.out.println("[put] 큐가 가득 참 - wait() 음식 = " + data);
                wait(); // RUNNABLE -> WAITING, 락 반납
                System.out.println("[put] 생산자 깨어남");
            } catch (InterruptedException e) {
                System.out.println("인터럽트 발생!");
            }
        }
        queue.offer(data);
        System.out.println("[put] 요리사 데이터 저장 후 알림 - notify()");
        notify(); // 대기 중인 스레드, WAIT -> BLOCKED 
        // notifyAll(); // 대기 중인 모든 스레드, WAIT -> BLOCKED
    }
    @Override
    public synchronized String take() {
        while (queue.isEmpty()) {
            try {
                System.out.println("[take] 큐에 음식이 없음 - wait()");
                wait();
                System.out.println("[take] 소비자 깨어남");
            } catch (InterruptedException e) {
                System.out.println("인터럽트 발생!");
            }
        }
        String meal = queue.poll();
        System.out.println("[take] 소비자 음식 얻은 후 알림 - notify()");
        notify(); // 대기 중인 스레드, WAIT -> BLOCKED
        // notifyAll(); // 대기 중인 스레드, WAIT -> BLOCKED
        return meal;
    }

출력 결과 - 생산자 우선

[요리사] 음식 제작 시작

[put] 요리사 데이터 저장 후 알림 - notify()

[요리사] 스파게티 음식 제작 완료, 대기 중인 음식[스파게티]

[요리사] 음식 제작 시작

[put] 큐가 가득 참 - wait() 음식 = 피자

[요리사] 음식 제작 시작

[put] 큐가 가득 참 - wait() 음식 = 햄버거

[소비자] 음식 가져오기

[take] 소비자 음식 얻은 후 알림 - notify()

[put] 생산자 깨어남

[put] 요리사 데이터 저장 후 알림 - notify()

[요리사] 피자 음식 제작 완료, 대기 중인 음식[피자]

[put] 생산자 깨어남

[put] 큐가 가득 참 - wait() 음식 = 햄버거

[소비자] 스파게티 음식 소비 완료

[소비자] 음식 가져오기

[take] 소비자 음식 얻은 후 알림 - notify()

[소비자] 피자 음식 소비 완료

[put] 생산자 깨어남

[put] 요리사 데이터 저장 후 알림 - notify()

[요리사] 햄버거 음식 제작 완료, 대기 중인 음식[햄버거]

[소비자] 음식 가져오기

[take] 소비자 음식 얻은 후 알림 - notify()

[소비자] 햄버거 음식 소비 완료

현재 큐 데이터: []

chef1의 상태 TERMINATED

chef2의 상태 TERMINATED

chef3의 상태 TERMINATED

consumer1의 상태 TERMINATED

consumer2의 상태 TERMINATED

consumer3의 상태 TERMINATED

 

synchronized 내부에서 wait() 호출 시 스레드는 WAITING 상태에 들어가며, 대기 상태에 들어간 스레드는 대기 집합(wait set)에서 관리가 되는데 모든 객체는 각자 대기 집합을 가지고 있다.

  • 생산자가 먼저 음식을 생산하는 경우
    • 요리사1이 락을 얻은 후 뷔페에 음식을 저장한 후 notify()를 통해 스레드 대기 집합에 생산되었다는 것을 알린다. → 대기 집합에 아무 스레드도 없으므로 지나감.
    • 요리사1이 작업을 마치고 락 반납.
    • 마찬가지로 요리사2가 락을 얻은 뒤 음식을 생산 후 뷔페에 음식을 저장하려는데 뷔페가 가득차서 wait() 호출 → 요리사2 WAITING 상태로 스레드 대기 집합에서 관리되며 락을 반납.
    • 요리사3 역시 동일 → 요리사3 WAITING 상태
    • 소비자1이 락을 얻고 음식을 소비한 후 notify()를 호출한다.
    • 대기 집합에 있는 요리사2, 3 스레드 중 요리사2를 깨운다고 가정. (실제 순서는 알 수 없음)
    • 요리사2는 대기 집합에서 나오지만 여전히 임계 영역 내부에 있으며 아직 락이 없으므로 BLOCKED 상태로 대기
    • 소비자1이 모든 작업을 마치고 락을 반납
    • 요리사2가 락을 얻고 BLOCKED → RUNNABLE 상태로 전환되며 wait() 이후의 코드를 실행하면서 뷔페에 음식을 넣고 notify() 호출하는데 여기서 스레드 대기 집합에 요리사3을 깨움.
    • 요리사3은 WAITING에서 BLOCKED 상태가 된다.
    • 요리사2가 모든 작업을 마치고 락을 반납.
    • 요리사3이 락을 얻고 작업을 하는데 큐가 가득 차있음(생산자가 생산자를 깨움).
      • 다시 wait()를 호출하며 WAITING 상태로 스레드 대기 집합에 들어가고 락 반납.
    • 소비자2가 락을 얻고 큐에 있는 음식을 소비 후 notify() 호출
    • 요리사3이 WAITING → BLOCKED 상태가 되며 락 반납까지 대기
    • 소비자2가 모든 작업을 마치고 락을 반납
    • 요리사3은 wait() 이후의 코드를 실행. 뷔페에 음식을 집어넣고 notify()를 호출하는데 대기 집합에는 아무 것도 없으므로 지나가고 모든 작업 처리 후 락 반납.
    • 소비자3이 락을 얻고 음식 소비 후 notify() 호출하지만 대기 집합에 아무 것도 없어 그냥 지나감.
    • 모든 작업 처리 후 락 반납.

여기서 위 코드를 보면 요리사가 소비자를 깨워야 하는데 요리사가 요리사를 깨우는 문제가 있다.

즉 스레드 대기 집합에서 요리사와 소비자를 모두 관리하기 때문에 이러한 문제가 발생하며

또한 notify()가 어떤 스레드를 깨우게 될 지 모르기 때문에 대기 집합에 있는 스레드 중 영원히 깨어나지 못하는 스레드 기아 문제가 발생할 수도 있다.

스레드 기아 문제 → notifyAll()로 해결이 가능.

모든 스레드를 대기 집합에서 꺼내어 BLOCKED 상태로 바꾸고 조건을 만족하지 못하는 스레드는 다시 대기 집합으로 들어감. → 아무리 늦게 호출되도 마지막에 남은 스레드는 락을 얻을 수 있지만 성능 저하.


Lock Condition

위 문제를 해결하려면 스레드 대기 집합을 생산자, 소비자 전용으로 구분하면 된다. Lock 인터페이스 내부의 Condition 객체를 통해 스레드 대기 집합을 구분할 수 있다.

Condition → (ReentrantLock을 사용하는 스레드가 대기하는 대기 집합)

public class BuffetV4 implements Buffet {
    private final Queue<String> queue = new ArrayDeque<>();
    private final Lock lock = new ReentrantLock();
    private final Condition chefCond = lock.newCondition();
    private final Condition consumerCond = lock.newCondition();
    private final int max;

    public BuffetV4(int max) {
        this.max = max;
    }

    @Override
    public void put(String data) {
        lock.lock();
        try {
            while (queue.size() == max) {
                try {
                    System.out.println("[put] 큐가 가득 참 - await() 음식 = " + data);
                    chefCond.await();
                    System.out.println("[put] 생산자 깨어남");
                } catch (InterruptedException e) {
                    System.out.println("인터럽트 발생!");
                }
            }
            queue.offer(data);
            System.out.println("[put] 요리사 데이터 저장 후 알림 - signal()");
            consumerCond.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    System.out.println("[take] 큐에 음식이 없음 - await()");
                    consumerCond.await();
                    System.out.println("[take] 소비자 깨어남");
                } catch (InterruptedException e) {
                    System.out.println("인터럽트 발생!");
                }
            }
            String meal = queue.poll();
            System.out.println("[take] 소비자 음식 얻은 후 알림 - signal()");
            chefCond.signal();
            return meal;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public synchronized String toString() {
        return queue.toString();
    }
}

출력 결과

[요리사] 음식 제작 시작

[put] 요리사 데이터 저장 후 알림 - signal()

[요리사] 스파게티 음식 제작 완료, 대기 중인 음식[스파게티]

[요리사] 음식 제작 시작

[put] 큐가 가득 참 - await() 음식 = 피자

[요리사] 음식 제작 시작

[put] 큐가 가득 참 - await() 음식 = 햄버거

[소비자] 음식 가져오기

[take] 소비자 음식 얻은 후 알림 - signal()

[put] 생산자 깨어남

[put] 요리사 데이터 저장 후 알림 - signal()

[요리사] 피자 음식 제작 완료, 대기 중인 음식[피자]

[소비자] 스파게티 음식 소비 완료

[소비자] 음식 가져오기

[take] 소비자 음식 얻은 후 알림 - signal()

[소비자] 피자 음식 소비 완료

[put] 생산자 깨어남

[put] 요리사 데이터 저장 후 알림 - signal()

[요리사] 햄버거 음식 제작 완료, 대기 중인 음식[햄버거]

[소비자] 음식 가져오기

[take] 소비자 음식 얻은 후 알림 - signal()

[소비자] 햄버거 음식 소비 완료

현재 큐 데이터: []

chef1의 상태 TERMINATED

chef2의 상태 TERMINATED

chef3의 상태 TERMINATED

consumer1의 상태 TERMINATED

consumer2의 상태 TERMINATED

consumer3의 상태 TERMINATED

 

chefCond → 생산자 스레드 대기 집합

consumerCond → 소비자 스레드 대기 집합

  • wait() → condition.await()
  • notify() → condition.signal()

생산자가 음식을 뷔페에 더 넣을 수 없으면 chefCond.await()를 호출하여 생산자가 chefCond에서 대기하도록 하고 음식을 넣는다면 consumerCond.signal()을 호출하여 소비자 스레드 대기집합 중 하나를 깨우도록 하였다.

마찬가지로 소비자가 음식을 꺼낼 수 없다면 consumerCond.await()를 호출하여 소비자가 소비자 스레드 대기 집합에 대기하도록 하였고 음식을 꺼냈다면 chefCond.signal()을 호출하여 생산자 스레드 대기집합 중 하나를 깨우도록 하였다.

Condition.signal() → 일반적으로 대기 중인 스레드를 FIFO 순서로 깨우며 ReentrantLock을 가지고 있는 스레드가 호출해야 한다.

 

ReentrantLock 역시 내부에 락과 락 획득을 대기하는 스레드를 관리하는 대기 큐가 존재

  • await()를 호출하여 대기할 경우
  • WAITING 상태로 condition 객체의 스레드 대기 공간에서 대기하며 signal()이 호출되었을 때 해당 공간에서 빠져나감
  • lock.lock()을 호출했을 때 락이 없을 경우
  • ReentrantLock의 대기 큐에서 관리 (synchronized에서 BLOCKED 상태 큐를 관리하는 락 관리 집합과 비슷하지만 여기서는 WAITING 상태로 대기) 다른 스레드가 unlock()을 했을 때 대기 큐를 빠져나감

BlockingQueue

위에서 생산자-소비자 문제를 해결하는 뷔페(버퍼)를 만들어 보았다.

자바에서는 위에서 구현한 뷔페처럼 특정 조건이 만족될 때까지 스레드의 작업을 차단하는 자료구조를 제공한다.

 

BlockingQueue.put() → 큐가 가득차면 공간이 생길 때 까지 데이터를 추가하는 작업을 차단

BlockingQueue.take() → 큐가 비어있으면 데이터가 들어올 때 까지 큐에서 데이터를 가져오는 작업을 차단

 

BlockingQueue 인터페이스의 구현체

  • ArrayBlockingQueue : 배열 기반으로 구현, 버퍼의 크기가 고정.
  • LinkedBlockingQueue : 링크 기반으로 구현, 버퍼의 크기를 고정할 수도, 무한하게 사용도 가능
public class ProducerConsumerMain2 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> buffet = new ArrayBlockingQueue<>(1);
        chefFirst(buffet); // 생산자 먼저 실행
        //consumerFirst(buffet); // 소비자 먼저 실행
    }

    private static void chefFirst(BlockingQueue<String> buffet) throws InterruptedException {
        List<Thread> threads = new ArrayList<>();
        startChef(buffet, threads);
        startConsumer(buffet, threads);
        printAllState(buffet, threads);
    }

    private static void consumerFirst(BlockingQueue<String> buffet) throws InterruptedException {
        List<Thread> threads = new ArrayList<>();
        startConsumer(buffet, threads);
        startChef(buffet, threads);
        printAllState(buffet, threads);
    }

    private static void startChef(BlockingQueue<String> buffet, List<Thread> threads) throws InterruptedException {
        String[] meals = {"스파게티", "피자", "햄버거"};
        int count = 1;
        for (String meal : meals) {
            Thread chef = new Thread(new Chef(buffet, meal), "chef" + count++);
            threads.add(chef);
            chef.start();
            Thread.sleep(100);
        }
    }

    private static void startConsumer(BlockingQueue<String> buffet, List<Thread> threads) throws InterruptedException {
        for (int i = 1; i <= 3; i++) {
            Thread consumer = new Thread(new Consumer(buffet), "consumer" + i);
            threads.add(consumer);
            consumer.start();
            Thread.sleep(100);
        }
    }

    private static void printAllState(BlockingQueue<String> buffet, List<Thread> threads) {
        System.out.println("현재 큐 데이터: " + buffet);
        for (Thread thread : threads) {
            System.out.println(thread.getName() + "의 상태 " + thread.getState());
        }
    }
    static class Chef implements Runnable {
        private final BlockingQueue<String> queue;
        private final String request;

        public Chef(BlockingQueue<String> queue, String request) {
            this.queue = queue;
            this.request = request;
        }

        @Override
        public void run() {
            try {
                System.out.println("[요리사] 음식 제작 시작");
                queue.put(request);
                System.out.println("[요리사] " + request + " 음식 제작 완료, 대기 중인 음식" + queue);
            } catch (InterruptedException e) {
                System.out.println("인터럽트 발생!");
            }
        }
    }

    // 소비자
    static class Consumer implements Runnable {
        private final BlockingQueue<String> queue;

        public Consumer(BlockingQueue<String> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                System.out.println("[소비자] 음식 가져오기");
                String take = queue.take();
                System.out.println("[소비자] " + take + " 음식 소비 완료");
            } catch (InterruptedException e) {
                System.out.println("인터럽트 발생!");
            }
        }
    }
}

출력 결과

[요리사] 음식 제작 시작

[요리사] 스파게티 음식 제작 완료, 대기 중인 음식[스파게티]

[요리사] 음식 제작 시작

[요리사] 음식 제작 시작

[소비자] 음식 가져오기

[요리사] 피자 음식 제작 완료, 대기 중인 음식[피자]

[소비자] 스파게티 음식 소비 완료

[소비자] 음식 가져오기

[소비자] 피자 음식 소비 완료

[요리사] 햄버거 음식 제작 완료, 대기 중인 음식[햄버거]

[소비자] 음식 가져오기

[소비자] 햄버거 음식 소비 완료

현재 큐 데이터: []

chef1의 상태 TERMINATED

chef2의 상태 TERMINATED

chef3의 상태 TERMINATED

consumer1의 상태 TERMINATED

consumer2의 상태 TERMINATED

consumer3의 상태 TERMINATED

 

즉 우리가 만든 뷔페(버퍼)를 사용하지 않고 BlockQueue를 사용해도 동일한 결과가 나오는 것을 확인할 수 있다.

 

BlockingQueue의 추가 기능

Opertional Throws Exception Special Value Blocks Times Out
추가 add(e) offer(e) put(e) offer(e, time, unit)
삭제 remove() poll() take() poll(time, unit)
관찰 element() peek() x x

Throws Exception - 대기시 예외를 던짐

  • add : 큐에 요소를 추가하지만 큐가 가득차면 예외
  • remove : 큐가 요소를 제거하지만 비어있으면 예외
  • element : 큐의 머리 요소를 반환하지만 제거하지는 않으며 비어있을 시 예외

Special Value - 대기시 즉시 반환

  • offer : 지정된 요소를 큐에 추가, 큐가 가득 차면 false 반환
  • poll() : 큐에서 요소를 제거하고 반환. 큐가 비어 있을 땐 null 반환
  • peek() : 큐의 머리 요소를 반환하지만 제거하지는 않으며 비어 있을 시 null 반환

Blocks - 대기

  • put : 요소를 큐에 추가할 때까지 대기. 큐가 가득 차면 공간이 생길 때까지 대기
  • take : 큐에서 요소를 제거하고 반환. 큐가 비어 있으면 요소가 생길 때까지 대기

Times Out - 시간 대기

  • offer : 지정된 요소를 큐에 추가, 만약 가득차면 지정된 시간 동안 큐가 비워지기를 기다리다가 지정된 시간이 넘어가면 false 반환
  • poll : 큐에서 요소를 제거하고 반환. 큐에 요소가 없다면 지정된 시간 동안 요소가 생기는 것을 기다리다가 지정된 시간이 넘어가면 null 반환