-
Guide
- (udemy 강의) https://www.udemy.com/course/asynchronous-python-with-asyncio
- (상세 문서) https://superfastpython.com/python-asyncio/
- (medium) https://medium.com/better-programming/a-hands-on-guide-to-concurrency-in-python-with-asyncio-af33a795d808
- (medium) https://medium.com/dev-bits/a-minimalistic-guide-for-understanding-asyncio-in-python-52c436c244ea
코드
import asyncio import random from asyncio import Task from datetime import datetime async def io_bound_task(x: float): current_time = datetime.now().strftime("%H:%M:%S") print(f'{current_time}, x={x}') # 주의: 아래와 같이 time.sleep() 을 사용하면 blocking 되어 async 실행이 안된다. # time.sleep(1.0) await asyncio.sleep(1.0) return x ** 2 async def action(inputs: list[float]): tasks: list[Task[float]] = [asyncio.create_task(io_bound_task(x)) for x in inputs] x_completed_result = await asyncio.gather(*tasks) for result in x_completed_result: print(result) if __name__ == '__main__': x_list = [random.random() for _ in range(20)] asyncio.run(action(x_list))설명
asyncio.as_completed(), asyncio.wait_for() 도 있으니 참고하도록 하자.
Blocking function 다루기
import asyncio import requests ENDPOINT = "https://catfact.ninja/fact" async def get_cat_fact(): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, lambda: requests.get(ENDPOINT)) return result.json().get('fact')'python > 응용' 카테고리의 다른 글
ABC vs Protocol (0) 2023.04.01 ThreadPoolExecutor (1) 2023.02.09 ProcessPoolExecutor (0) 2022.10.29 Raw SQL & SQLAlchemy 예제 (0) 2022.07.04 requests (0) 2022.05.31