ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • ProcessPoolExecutor
    python/응용 2022. 10. 29. 18:39

    https://medium.com/@mecha-mind/think-twice-before-using-asyncio-in-python-7683472cb7a3

    import math
    import random
    import time
    from concurrent.futures import ProcessPoolExecutor
    
    
    def square_array(arr: list[int]) -> list[int]:
        return [x ** 2 for x in arr]
    
    
    def square_array_in_parallel(arr: list[int], n: int = 4, m: int = 1000) -> list[int]:
        """
        n: number of processes
        m: number of sub-arrays the array is divided into
        """
    
        # The original array is divided into m sub-arrays and are processed
        # in parallel.
        # batch is the size of one sub-array
        batch: int = int(math.ceil(len(arr) / m))
    
        with ProcessPoolExecutor(n) as executor:
            res: list[list[int]] = \
                executor.map(square_array,
                             [arr[i * batch:min((i + 1) * batch, len(arr))] for i in range(m)])
    
        # Merge the processed sub-arrays after finding their squares
        out: list[int] = []
        for x in res:
            out += x
    
        return out
    
    
    if __name__ == '__main__':
        # An integer array of size 100 million with integers from 1 to 1000
        arr: list[int] = random.choices(range(1, 1000), k=100000000)
    
        # Without parallel processing
        start = time.perf_counter()
        square_array(arr)
        print(time.perf_counter() - start)
    
        # With parallel processing
        start = time.perf_counter()
        square_array_in_parallel(arr)
        print(time.perf_counter() - start)

     

     

    Using multiprocessing.Pool

    import multiprocessing
    import os
    import random
    import time
    
    from datetime import datetime
    
    
    def cpu_bound_task(x: float) -> float:
        pid = os.getpid()
        current_time = datetime.now().strftime("%H:%M:%S")
        print(f'{current_time} pid={pid}, x={x}')
    
        time.sleep(1.0)
        return x ** 2
    
    
    if __name__ == '__main__':
        x_list = [random.random() for _ in range(20)]
        with multiprocessing.Pool(processes=4) as p:
            x_completed_result: list[float] = p.map(cpu_bound_task, x_list)
    
        for result in x_completed_result:
            print(result)

    'python > 응용' 카테고리의 다른 글

    ThreadPoolExecutor  (0) 2023.02.09
    AsyncIO  (0) 2022.10.29
    Raw SQL & SQLAlchemy 예제  (0) 2022.07.04
    requests  (0) 2022.05.31
    json 읽기 / 쓰기  (0) 2022.05.31

    댓글

Designed by Tistory.