Python 제너레이터와 코루틴을 활용한 고급 기법
Wenhao Wang
Dev Intern · Leapcell

비동기 Python 프로그래밍 소개
현대 소프트웨어 개발 환경에서 효율성과 응답성은 매우 중요합니다. 작업이 순차적으로 실행되는 전통적인 동기 프로그래밍은 특히 네트워크 요청이나 파일 접근과 같은 I/O 바운드 작업에서 병목 현상을 일으킬 수 있습니다. 바로 여기서 비동기 프로그래밍이 빛을 발하며, 프로그램이 메인 실행 스레드를 차단하지 않고 여러 작업을 동시에 수행할 수 있도록 합니다. Python은 효율적이고 확장 가능한 비동기 애플리케이션 구축의 기본이 되는 강력한 제너레이터 및 코루틴과 같은 구문을 제공합니다. 고급 사용법을 이해하면 복잡한 작업을 처리하고, 정교한 데이터 처리 파이프라인을 구축하며, 애플리케이션 성능을 크게 향상시킬 수 있는 새로운 가능성을 열어줍니다. 이 글은 Python 제너레이터와 코루틴의 고급 기법을 자세히 살펴보고, 이를 활용하여 더 우아하고 동시적이며 고성능의 코드를 작성하는 방법을 보여줍니다.
동시 실행의 핵심 개념
고급 활용 사례를 자세히 살펴보기 전에, 논의의 기반이 되는 핵심 개념을 간략히 복습해 보겠습니다.
- 제너레이터(Generator): 이터레이터(iterator) 객체를 반환하는 특수한 유형의 함수입니다.
yield
키워드를 사용하여 실행을 일시 중지하고 값을 내보내며,next()
를 호출하면 중단된 지점에서 다시 시작합니다. 제너레이터는 전체 목록을 메모리에 구축하는 대신 요청 시 값을 생성하므로 메모리 효율적입니다. - 코루틴(Coroutine): 서브루틴의 일반화입니다. 서브루틴과 달리 코루틴은 실행을 일시 중지했다가 나중에 중단된 지점에서 다시 시작할 수 있습니다. Python에서는 제너레이터를 코루틴으로 사용할 수 있으며, 특히
yield from
구문을 사용하면 하위 제너레이터에 위임할 수 있습니다. Python의async
/await
키워드는asyncio
프레임워크 내에서 코루틴을 정의하고 작업하는 데 더 명시적이고 전용적인 구문을 제공합니다. - 이벤트 루프(Event Loop): 비동기 시스템의 핵심입니다. 다양한 작업을 모니터링하고 준비가 되었을 때 실행되도록 예약하여 코루틴의 실행 흐름을 효과적으로 관리합니다.
- 비동기 I/O(Async I/O): 프로그램이 I/O 작업이 완료되기를 기다리는 동안 다른 작업을 계속할 수 있도록 허용하는 입력/출력 처리의 한 형태입니다. 이는 논블로킹 작업에 중요합니다.
고급 제너레이터 패턴
제너레이터는 단순한 반복 작업용이 아니라 강력한 데이터 처리 파이프라인을 구축하는 데 사용될 수 있습니다.
제너레이터를 이용한 데이터 파이프라이닝
대규모 로그 파일을 처리해야 하는 시나리오를 생각해 봅시다. 줄을 필터링하고, 특정 정보를 추출하고, 그런 다음 형식을 지정해야 합니다. 연결된 제너레이터 표현식이나 함수를 사용하면 이를 효율적으로 수행할 수 있습니다.
import re def read_log_file(filepath): """제너레이터를 사용하여 로그 파일에서 줄을 읽습니다.""" with open(filepath, 'r') as f: for line in f: yield line.strip() def filter_errors(lines): """'ERROR'를 포함하는 줄을 필터링합니다.""" for line in lines: if "ERROR" in line: yield line def extract_timestamps(error_lines): """오류 줄에서 타임스탬프를 추출합니다.""" timestamp_pattern = re.compile(r"[\[](\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})[ \]]") for line in error_lines: match = timestamp_pattern.search(line) if match: yield match.group(1) # 예시 사용법 # 데모를 위한 더미 로그 파일 생성 with open('sample.log', 'w') as f: f.write("[2023-10-26 10:00:01] INFO User logged in\n") f.write("[2023-10-26 10:00:05] ERROR Failed to connect to DB\n") f.write("[2023-10-26 10:00:10] DEBUG Processing request\n") f.write("[2023-10-26 10:00:15] ERROR Invalid input data\n") log_lines = read_log_file('sample.log') filtered_errors = filter_errors(log_lines) error_timestamps = extract_timestamps(filtered_errors) print("Error Timestamps:") for ts in error_timestamps: print(ts)
이 예제에서 각 함수는 이전 단계에서 데이터를 소비하고 다음 단계에 대해 변환된 데이터를 생산하는 제너레이터입니다. 이렇게 하면 데이터가 한 번에 한 항목씩 지연 처리되므로 메모리 효율적인 파이프라인이 생성됩니다. 중간 목록은 생성되지 않으며, 이는 대규모 데이터셋에 중요합니다.
유한 상태 기계로서의 제너레이터
제너레이터는 send()
를 통해 값을 생성하고 입력을 받아 간단한 유한 상태 기계 역할을 할 수 있습니다. 이렇게 하면 단일 제너레이터 함수가 외부 이벤트에 따라 내부 상태를 관리할 수 있습니다.
특정 토큰에 따라 모드를 전환하는 간단한 파서를 고려해 봅시다.
def state_machine_parser(): state = "INITIAL" while True: token = yield state # 현재 상태를 생성하고 다음 토큰을 받습니다. if state == "INITIAL": if token == "START_BLOCK": state = "IN_BLOCK" elif token == "END_STREAM": print("Stream ended during INITIAL state.") return else: print(f"Ignoring token '{token}' in INITIAL state.") elif state == "IN_BLOCK": if token == "PROCESS_ITEM": print("Processing item inside block.") elif token == "END_BLOCK": state = "INITIAL" elif token == "END_STREAM": print("Stream ended during IN_BLOCK state.") return else: print(f"Handling token '{token}' inside block.") # 상태 기계 초기화 parser = state_machine_parser() next(parser) # 제너레이터를 시작하고 "INITIAL"을 생성합니다. print(parser.send("SOME_DATA")) # 출력: Ignoring token 'SOME_DATA' in INITIAL state. print(parser.send("START_BLOCK")) # 출력: IN_BLOCK print(parser.send("PROCESS_ITEM")) # 출력: Processing item inside block. print(parser.send("ANOTHER_ITEM")) # 출력: Handling token 'ANOTHER_ITEM' inside block. print(parser.send("END_BLOCK")) # 출력: INITIAL print(parser.send("END_STREAM")) # 출력: Stream ended during INITIAL state.
state_machine_parser
제너레이터는 현재 상태를 생성하고 자신에게 전송된 토큰을 소비합니다. 토큰과 현재 상태에 따라 새 상태로 전환하거나 작업을 수행합니다. 이 패턴은 이벤트 기반 시스템이나 프로토콜 파싱에 효과적입니다.
Asyncio를 사용한 코루틴
asyncio
라이브러리는 async
/await
구문과 함께 Python의 주요 비동기 프로그래밍 프레임워크를 제공합니다. yield
제너레이터를 코루틴으로 사용할 수 있지만, async def
코루틴은 asyncio
이벤트 루프와 더 명시적으로 통합됩니다.
비동기 태스크 구축
코루틴은 이벤트 루프에 의해 실행됩니다. await
는 하나의 코루틴, Future 또는 Task가 완료될 때까지 코루틴의 실행을 일시 중지하는 데 사용됩니다.
import asyncio import time async def fetch_data(delay, item_id): """비동기 네트워크 요청을 시뮬레이션합니다.""" print(f"[{time.time():.2f}] Start fetching data for item {item_id}") await asyncio.sleep(delay) # I/O 바운드 작업을 시뮬레이션합니다. print(f"[{time.time():.2f}] Finished fetching data for item {item_id}") return f"Data for {item_id} after {delay} seconds" async def main(): start_time = time.time() # 동시에 실행되는 여러 태스크 생성 task1 = asyncio.create_task(fetch_data(3, "A")) task2 = asyncio.create_task(fetch_data(1, "B")) task3 = asyncio.create_task(fetch_data(2, "C")) # 모든 태스크가 완료될 때까지 대기 results = await asyncio.gather(task1, task2, task3) print("\nAll tasks completed.") for res in results: print(res) end_time = time.time() print(f"Total execution time: {end_time - start_time:.2f} seconds") # 메인 코루틴 실행 if __name__ == "__main__": asyncio.run(main())
이 예제에서 fetch_data
는 데이터를 가져오는 것을 시뮬레이션하는 async
코루틴입니다. main
은 세 개의 그러한 태스크를 생성하고 asyncio.gather
를 사용하여 동시에 실행합니다. 태스크 A, B, C의 지연 시간이 각각 3, 1, 2초임에도 불구하고, 총 실행 시간은 합계(6초)가 아닌 최대 지연 시간(3초)에 가깝습니다. 이는 진정한 동시성을 보여줍니다.
yield from
(async/await 이전) 및 await
를 사용한 고급 코루틴 위임
async
/await
가 현대적인 방식이지만, 제너레이터 기반 코루틴의 yield from
을 이해하는 것은 Python 비동기 기능의 진화에 대한 통찰력을 제공합니다. yield from
을 사용하면 제너레이터가 작업의 일부를 다른 제너레이터에 위임할 수 있습니다. async
/await
를 사용하면 await
를 사용하여 다른 코루틴을 호출하면 이 위임이 더 명시적입니다.
async
/await
를 사용하여 이를 설명해 보겠습니다.
import asyncio async def sub_task(name, delay): print(f" Sub-task {name}: Starting...") await asyncio.sleep(delay) print(f" Sub-task {name}: Finished.") return f"Result from {name}" async def main_task(task_id): print(f"Main task {task_id}: Starting...") # sub_task에 실행 위임, sub_task가 완료될 때까지 main_task 일시 중지 result_a = await sub_task(f"{task_id}-A", 1) result_b = await sub_task(f"{task_id}-B", 0.5) print(f"Main task {task_id}: Received '{result_a}' and '{result_b}'.") return f"Main task {task_id} complete with {result_a}, {result_b}" async def orchestrator(): print("Orchestrator: Kicking off main tasks...") results = await asyncio.gather( main_task("X"), main_task("Y") ) print("\nOrchestrator: All main tasks finished.") for r in results: print(f"Final result: {r}") if __name__ == "__main__": asyncio.run(orchestrator())
여기서 orchestrator
는 main_task("X")
와 main_task("Y")
를 동시에 실행합니다. 각 main_task
는 차례로 sub_task
에 대해 순차적으로 await
합니다. 이것은 코루틴이 복잡하고 중첩된 비동기 작업을 구축하는 방법을 보여줍니다. await
키워드는 호출 코루틴에서 대기하는 코루틴으로 효과적으로 제어권을 위임하고, 완료되면 호출자를 재개합니다.
asyncio
를 사용한 동시성 기본 요소
asyncio
는 코루틴 실행을 관리하기 위한 스레딩 구성과 유사하지만 코루틴을 위해 설계된 여러 기본 요소를 제공합니다.
- 잠금(Lock) (
asyncio.Lock
): 한 번에 하나의 코루틴만 공유 리소스에 액세스하도록 보장하여 경쟁 조건을 방지합니다. - 세마포(Semaphore) (
asyncio.Semaphore
): 리소스에 동시에 액세스할 수 있는 코루틴 수를 제한합니다. 연결 풀링 또는 속도 제한에 유용합니다. - 이벤트(Event) (
asyncio.Event
): 코루틴이 서로 신호를 보낼 수 있도록 합니다. 코루틴은 이벤트가 설정될 때까지 기다릴 수 있으며, 다른 코루틴은 이벤트를 설정할 수 있습니다. - 큐(Queue) (
asyncio.Queue
): 코루틴 간의 통신을 위한 스레드 안전(및 코루틴 안전) 큐로, 프로듀서-소비자 패턴을 활성화합니다.
이러한 기본 요소는 공유 상태 및 리소스를 안전하게 관리하는 강력한 비동기 애플리케이션을 구축하는 데 필수적입니다.
결론
Python의 제너레이터와 코루틴, 특히 asyncio
프레임워크는 효율적이고 논블로킹이며 동시적인 코드를 작성하기 위한 강력한 도구를 제공합니다. 제너레이터를 사용한 우아한 데이터 파이프라이닝부터 async
/await
를 사용한 복잡한 비동기 워크플로 오케스트레이션에 이르기까지, 이러한 고급 기법을 익히는 것은 까다로운 계산 및 I/O 바운드 작업을 더 큰 효율성과 응답성으로 처리할 수 있도록 지원합니다. 이러한 기능을 활용하는 것은 현대적이고 고성능인 애플리케이션을 위해 Python의 전체 잠재력을 발휘하는 데 중요합니다.