Sangmun

python Producer-Consumer(생산자 소비자 문제) Threading 본문

개발

python Producer-Consumer(생산자 소비자 문제) Threading

상상2 2023. 3. 7. 22:32

Producer-consumer Problem(생산자 소비자 문제는) threading이나 프로세스 동기화 이슈와 관련된 computer science에서 자주 볼 수 있는 문제이다.

 

현재 글에서는 python의 Lock을 이용하여 Producer-comsumer problem을 어떻게 해결하는지 다루고자 한다.

다음과 같은 예제가 있다. 외부로부터 message를 수신하는 작업과 수신한 message를 db에 저장하는 작업이 있다. 이때 외부에서 들어오는 message는 언제 들어올지 모르며 때때로 너무 많은 양이 들어올 때가 있다. 이 message를 수신하는 작업이 Producer이다. 수신한 message를 db에 저장하는 작업 Consumer이며 작업의 속도가 상대적으로 느리다. 따라서 외부에서 들어오는 message를 누락 없이 제대로 관리를 하려면 Producer와 Comsumer가 동기화되어서 처리되어야 할 필요가 있다.

 

코드를 하나하나 설명해보겠다.

아래는 message를 생성하여 수신하는 상황을 가정한 producer 코드이며 10개의 message만 받고 종료하는 상황을 가정한 코드이다.

import random 

SENTINEL = object()

def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")

아래는 consumer 코드이며 producer가 message를 받으면 출력(db에 저장하는 작업을 가정)하며 SENTINEL object가 들어오면 출력은 하지 않는다.

def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)

아래는 main section이며 2개의 thread를 생성하여 producer작업과 consumer 작업을 실시한다.

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)

 

가장 중요한 message가 전달되는 Pipeline class 코드로 Lock을 이용하여 Producer와 Consumer 간에 주고받는 message가 동기화되게 한다.

우선 클래스의 인스턴스가 생성되었을 때는 message가 아무것도 없음으로 consumer를 lock 하도록 한다.

이후 producer가 message를 생성할 때는 producer를 lock 하고 message를 저장한 이후 consumer를 release 하면서 consumer가 producer로부터 생성된 message에 접근을 할 수 있게 해 준다.

consumer는 get_message 함수를 통하여 우선 다시 consumer lock을 실시하고 producer로부터 생성된 message에 접근을 한다. 이후 message는 처리되었으므로 다시 producer release를 실시하여 producer가 message에 접근을 할 수 있는 상태로 만들어 준다.

class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message

    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)

 

위의 코드는 producer와 consumer가 한 번의 하나의 메시지만을 동기화하도록 설정한 코드이며 실행을 하면 아래와 같이 message가 한 번에 하나씩만 처리되는 것을 확인할 수 있다.

$ ./prodcom_lock.py
Producer got data 43
Producer got data 45
Consumer storing data: 43
Producer got data 86
Consumer storing data: 45
Producer got data 40
Consumer storing data: 86
Producer got data 62
Consumer storing data: 40
Producer got data 15
Consumer storing data: 62
Producer got data 16
Consumer storing data: 15
Producer got data 61
Consumer storing data: 16
Producer got data 73
Consumer storing data: 61
Producer got data 22
Consumer storing data: 73
Consumer storing data: 22

Producer-Consumer Using Queue

위의 예제는 한번에 하나의 message만 처리할 수 있는 코드이며 여러 개의 message를 처리하고 싶으면 python의 Queue Class를 이용하여 구현이 가능하다.

 

변경된 사항은 thread의 event object를 사용한 것으로 producer와 consumer는 특정 event가 일어날 때까지 thread를 생산하며 소비한다. threading.Event는 하나의 thread가 어떠한 event가 발생했을 때 다른 thread에게 signal을 주는 것을 가능하게 한다.

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

 

producer 코드로 이전의 코드가 SENTINEL을 이용해서 작업의 끝을 알렸다면 이번에는 event.set()으로 확인하도록 만들었다.

def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    logging.info("Producer received EXIT event. Exiting")

consumer 코드로 event가 set이 되었거나 pipline이 빈 것이 아닌 이상 작업은 계속된다.

def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s  (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting")

마지막으로 Pipline class이며 Queue를 상속받았고 maxsize를 설정해 줌으로써 최대로 처리될 수 있는 양을 설정하였다.

간단하게 get과 set함수로 이전의 Lock의 구현사항을 대체하였다.

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)

실행 결과는 아래와 같다. 최대로 설정한 queue의 사이즈만큼 동시에 message가 처리되는 것을 확인할 수 있다.

$ ./prodcom_queue.py
Producer got message: 32
Producer got message: 51
Producer got message: 25
Producer got message: 94
Producer got message: 29
Consumer storing message: 32 (queue size=3)
Producer got message: 96
Consumer storing message: 51 (queue size=3)
Producer got message: 6
Consumer storing message: 25 (queue size=3)
Producer got message: 31

[many lines deleted]

Producer got message: 80
Consumer storing message: 94 (queue size=6)
Producer got message: 33
Consumer storing message: 20 (queue size=6)
Producer got message: 48
Consumer storing message: 31 (queue size=6)
Producer got message: 52
Consumer storing message: 98 (queue size=6)
Main: about to set event
Producer got message: 13
Consumer storing message: 59 (queue size=6)
Producer received EXIT event. Exiting
Consumer storing message: 75 (queue size=6)
Consumer storing message: 97 (queue size=5)
Consumer storing message: 80 (queue size=4)
Consumer storing message: 33 (queue size=3)
Consumer storing message: 48 (queue size=2)
Consumer storing message: 52 (queue size=1)
Consumer storing message: 13 (queue size=0)
Consumer received EXIT event. Exiting

 

출처 : https://realpython.com/intro-to-python-threading/#producer-consumer-threading

 

An Intro to Threading in Python – Real Python

In this intermediate-level tutorial, you'll learn how to use threading in your Python programs. You'll see how to create threads, how to coordinate and synchronize them, and how to handle common problems that arise in threading.

realpython.com

 

'개발' 카테고리의 다른 글

udemy mongodb 강의 후기  (0) 2023.03.14
python 동시성에 대한 정리  (0) 2023.03.07
Python Thread의 개념과 사용예시  (0) 2023.03.07
Google cloud Compute Engine에서 인스턴스 만들기  (0) 2023.02.11
Docker compose  (0) 2023.02.04
Comments