import logging
import time
from queue import Queue
from threading import Thread
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
queue = Queue(3)
consumer_count = 2
class Consumer(Thread):
def run(self) -> None:
while True:
value = queue.get(block=True)
if value is None:
logger.info(f"Recv poison pill, terminate Consumer-{self.name}")
break
logger.info(f"Consumer-{self.name}: {value}")
// IO bound task
time.sleep(0.5)
class Producer(Thread):
def run(self) -> None:
for i in range(10):
queue.put(i)
logger.info(f"Producer: {i}")
# time.sleep(0.1)
# poison pill
for i in range(consumer_count):
queue.put(None)
if __name__ == '__main__':
producers = [Producer()]
consumers = [Consumer() for _ in range(consumer_count)]
# starts producer
for p in producers:
p.start()
# starts consumer
for c in consumers:
c.start()
for p in producers:
p.join()
for c in consumers:
c.join()
logger.info("------ Done")
ChatGPT 가 만들어 준 코드
import queue
import threading
# Define a worker function that will consume requests from the queue
def worker(q):
while True:
request = q.get()
# Process the request here
print(f"Processing request {request}")
q.task_done()
# Create a queue to hold requests
q = queue.Queue()
# Create multiple worker threads to consume requests from the queue
for i in range(4):
t = threading.Thread(target=worker, args=(q,))
t.daemon = True
t.start()
# Add requests to the queue
for request in range(10):
q.put(request)
# Wait for all requests to be processed
q.join()