RabbitMQ 톺아보기 2부 (feat.pika)
RabbitMQ 톺아보기 1부에 이어서 2부를 시작하겠습니다!
2부에서는 실제 python application으로 RabbitMQ를 활용하는 방법에 대해 알아보려고 합니다. pika라는 라이브러리를 활용할 예정이에요!
pika는 RabbitMQ 공식 홈페이지 듀토리얼에서도 사용되는 라이브러리입니다! 공식홈페이지에 보면 다양한 언어에 대해 친절하게 예제들이 만들어져 있습니다! RabbitMQ 듀토리얼과 pika공식문서가 진짜 잘 나와 있어서 여길 보시는 게 제일 좋은 것 같아요! 일단 그냥 속성으로 아 이렇게 쓰구나 정도로 제 글을 보시면 될 것 같습니다!
1. pika란?
피카(pika)란 피카 피카 피카츄~~~백만볼트를 사용하는 라이브러리...
에헴... pika는 RabbitMQ를 사용할 수 있게 도와주는 python 라이브러리 입니다.
앞서 말씀드렸다시피 RabbitMQ 공식 튜도리얼에서도 이 라이브러리를 사용하는 거면 상당히(?) 검증이 되어있는 라이브러리라고 볼 수 있을 것 같습니다.
먼저 간단히 pika를 이용해서 RabbitMQ 커넥션과 Pub/Sub을 담당하는 모듈을 만들어 보겠습니다.
2. Pub/Sub 모듈 만들기
pika는 동기, 비동기 커넥션 모듈을 제공합니다. 비동기 어댑터에서는 별도의 I/O loop와 콜백 형태의 사용 방식으로 비동기 구현을 할 수 있습니다. pika로 RabbitMQ 커넥션 하는 방법 중 가장 간단하고 쉬운 방법은 BlockingConnection을 사용해 동기적으로 처리하는 방법입니다.
일단은 BlockingConnection 어댑터를 사용해서 Pub/Sub모듈을 만들고 간단히 테스트 먼저 해보겠습니다. 뒤에서 비동기 어댑터에 대한 내용을 좀 더 자세히 다루도록 할게요!
첫 번째로 BasicPikaClient라는 클래스를 정의했습니다. 기본적으로 RabbitMQ와의 커넥션을 만들어주는 클래스입니다. PlainCredentals은 말 그대로 인증과 관련된 클래스입니다.
ConnectionParameters를 통해 url, host, user, password, port, credentials 등의 값을 BlockingConnection 어댑터로 전달하여 인스턴스를 생성합니다. 생성된 커넥션 객체를 통하여 chnnel을 생성할 수 있습니다.
chnnel은 간단히 말하자면 "단일 TCP 연결을 공유하는 경량화된 연결"이라고 합니다. 뭐 TCP연결을 여러 개 만들어서 사용하면 리소스 낭비니까 하나의 TCP 연결에 여러 개의 chnnel을 만들어서 공유한다는 의미인 것 같습니다.
주의사항으로는 멀티 프로세스, 스레드인 경우 chnnel을 공유하지 말고 새로운 각 프로세스, 스레드마다 새로운 chnnel을 만들어서 사용하는 것이 일반적이라고 합니다.
좀 더 자세한 chnnel에 대한 설명은 여기서 확인하시길 바라요!
기본적인 커넥션에 대한 클래스를 생성했으니 publisher, consumer 클래스를 만들어 보겠습니다.
코드를 보면 queue, exchange를 선언하는 메소드와 메세지를 publish 하는 메소드가 있습니다.
이벤트를 소비하는 클래스에서는 하나의 메세지만 처리하는 메소드, 메세지를 계속 소비하는 메소드 두 가지가 있습니다.
여기서 주의해서 봐야 할 건 basic_consume의 auto_ack파라미터 인데요. 말 그대로 acknowledgement를 자동으로 처리할 건지에 대한 옵션입니다. True인 경우 자동으로 처리해주고 아닌 경우 사용자가 직접 ack에 대한 처리를 해줘야 합니다.
여기서 ack에 대한 처리란 RabbitMQ로 ack를 보내줘야 한다는 의미입니다. 메세지에 대한 callback을 마지막까지 잘 처리했다는 걸 보장하는 의미입니다. 중간에 로직이 잘못되어 실행이 안 되는 경우 다시 이 메세지를 소비해야 하기 때문입니다.
auto_ack가 False일때 메시지를 모두 처리하고 ack를 RabbitMQ로 전달하지 않은경우 위 처럼 메시지가 삭제되지 않고 Unacked에 남아있게 됩니다. 이때 실행중인 consumer를 중지하면 Ready로 메세지가 이동합니다.
start_consuming()은모든 consumer들이 canceld 될 때까지 I/O events, dispatchers timers, 'basic_consume' callback 등을 처리한다고 합니다. while문을 돌다가 채널이 close 되면 rasie 호출되면서 종료되는 형태로 보입니다.
내부적으로 어떻게 되나 궁금해서 좀 더 파보았는데... 계속 event를 polling 하여 처리할 event가 있는지 확인합니다.
event가 존재하면 basic_consum()의 on_messge_callback 파라미터로 넘겨준 함수가 callback으로 실행됩니다.
근본적으로 RabbtMQ에서 데이터를 어떻게 받아오는지 보고 싶어서 좀 더 들여다보았는데요! on_socket_readable 메소드의 설명을 보면
소켓에서 데이터를 수집하고 예외가 발생하거나(일반적으로 EAGAIN 또는 EWOULDBLOCK), 이벤트별 데이터 소비 제한에 도달하거나, 전송이 비활성화되거나, 실패할 때까지 데이터를 프로토콜에 전달합니다.
라고 하네요. 당연한 거긴 하지만 결국 모든 것은 소켓을 통해...ㅎㅎ
3. 메세지 생성 및 소비 테스트
1초 간격으로 메세지를 계속 생성하고, 그 메세지를 소비하는 예제를 만들어 보겠습니다.
저는 exchange를 topic타입으로 설정해서 테스트했습니다. 위 두 프로그램을 실행시키면 아래처럼 동작을 하게 됩니다.
RabbitMQ GUI에서 보면 현재 상태는 이렇습니다.
4. 비동기 어댑터 (Asynchronous adapter)
위에서 잠시 설명했던 비동기 어댑터에 대해서 설명해보겠습니다.
blocking connection adapter 모듈은 Pika의 핵심 AMQP 드라이버 위에 blocking semantics을 구현합니다.
blocking connection adapter를 사용할 때 대부분의 비동기식 예상(asynchronous expectations)이 제거되지만,
서버에서 보낸 RPC 명령의 지원, AMQP 프로토콜의 비동기식 RPC 특성에 충실하려고 시도합니다.
공식문서 설명을 번역한 건데 정확히 맞는 표현인지는 조금 애매하지만, 결론적으로는 동기적(synchronous)으로 브로커(RabbitMQ)로부터 받은 message를 소비하는 모듈입니다.
그럼 비동기적으로 처리하는 방법이 있다는 건데 위에서 보다시피 AsyncConnection, SelectConnection, GeventConnection, TornadoConnection, TwistedProtocolConnection이 비동기적으로 event를 처리하는 어댑터들입니다.
비동기 커넥션 어댑터들은 BlockingConnection과는 다르게 별도의 I/O loop와 콜백 형태의 문법을 통해 비동기적으로 event를 처리할 수 있습니다. 콜백형태의 문법이라 다소 파이써닉하지 않고 복잡하다고 하네요ㅎㅎ
이렇게 글로 보는 것보다는 코드로 한번 확인하는 게 좋을 것 같습니다! 저는 AsyncConnection을 사용해서 설명드리겠습니다.
(영문으로 정확한 내용을 파악하시려면 공식문서를 보시길 추천드립니다!)
publishing 하는 모듈은 동기로 그냥 사용하도록 하겠습니다. 비동기로 메세지를 소비하는 예제를 보겠습니다!
main 함수에서 gather()를 통해 코루틴(random_task)을 실행시킵니다. 또 create_task()로 do_rabbitmq()를 schedule 합니다.
AsyncioConnection으로 커넥션을 생성하는 모듈을 만들었습니다. 중요 코드만 설명하고 전체 코드는 깃헙을 참고해주시길 바랍니다.
생성자에서 새로운 이벤트 루프(new_event_loop())를 만들어서 AyncioConnection의 custom_ioloop로 전달합니다.
참고 (요건 코드를 커스텀 해서 사용하는 부분에 대한 저의 개인적인 의견 - 정확하지 않을 수 있습니다)
새로운 event loop를 만든 이유는 공개할 소스코드를 보면 확인할 수 있습니다만, 간략히 설명드리자면...
코드에 RabbitMQ 커넥션을 시작하는 start 메소드가 있는데 여기서 새로운 스레드를 만들고 event loop를 run_forever()로 running 시키기 때문입니다. 만약 이 상태에서 RabbitMQConnection클래스의 생성자에서 get_event_loop()를 사용하면 main thread의 evnet loop를 가져오게 되고 이미 running중인 event loop를 재실행하려고 해서 RuntimeError가 납니다.
그래서 main thread의 event loop를 사용해서 rabbitmq consumer를 비동기로 처리하고 싶다고 하시면 RabbitMQConnection의 start 메소드를 실행하지 않고 생성자에서 get_event_loop()를 호출하면 됩니다.
저도 참고한 코드라 처음 작성자의 의도는 모르겠지만... main thread가 아닌 새로운 thread의 event loop를 사용하려는 이유를 추측해 보자면 애초에 하나의 thread(or process)에 하나의 chnnel을 권장하기도 하고 main thread의 event loop를 그대로 사용하면 main에서 사용하는 다른 event들과 pika에서 처리하는 event들이 같은 event loop에서 처리되기 때문에 작업이 겹치는(?) 상황이라 이런 부분이 조금 어색하기 때문이지 않나 생각됩니다.
결론은
- 용도, 역할별로 별도의 event loop를 만들어서 사용하자라는 의미로 새로운 thread에 새로운 event loop를 사용하지 않았을까
- 그리고 main은 main대로 pika는 pika대로 따로 처리해야 하기 때문에
저는 이런 의미라고 이해하고 사용했습니다. (혹시나 다른 의견이 있으시다면 댓글로 남겨주세요!)
5. pika 문서를 보면서 잘 모르던 단어
문서를 보면서 잘 모르지만 중요한 거 같은 단어들에 대해서 정리해보려고 합니다.
첫 번째는 RPC입니다!
여기서 말하는 RPC는 gRPC의 RPC와 동일한 개념인데요. RPC(Remote Procedure Call)란 간단하게 Client는 Request를 Server에 전달하고 Server는 해당 Request를 처리하여 알맞은 결과 값을 다시 Client에 Response 해주는 방법이라고 합니다.
두 번째는 CPS(continuation-passing style)입니다!
pika문서를 보다 보면 CPS에 대한 언급이 자주 나오는데요. CPS 코딩 패턴은 함수 실행이 끝남과 동시에 연이어 함수가 또 실행되는 프로그래밍 모델로 callback형태의 스타일을 말합니다.
CPS에 대한 자세한 설명은 아래 블로그를 참고하시길 바랍니다!
6. 마치며
pika라이브러리에 대한 설명만 하려고 했는데 Asyncio라던지 RPC, CPS 등 생소한 용어들에 대한 설명도 함께 하려다 보니 다소 길어지기도 하고 정리가 잘 안 되는 부분도 있긴 하네요 ㅎㅎ
내용을 pika 동기, 비동기 커넥션 두 가지로 쪼개는 게 좀 더 설명하기 좋았을 텐데... 이렇게 길어질 줄은 몰랐습니다. 그리고 공식문서와 깃헙을 보다보니 영어해석이 좀 난해한 부분이 있어서...영어 공부를 해야겠다는 생각을 했습니다 ㅠㅠ 구글번역으로는 뭔가 확 와닿지 않는 부분도 있고 해석에 자신감이 없으니 좀 더 이해하는데 오래걸리는 것 같기도 하네요.
여튼 여러분들이 pika문서 보시면서 한번 직접 해보시길 바래요! 공식문서에 여러 connection adaptor들을 사용한 예제도 잘 되어있어서 좋더라고요!
그럼 오늘도 좋은 하루 보내세요!
참고자료
RabbitMQ connection, chnnel 설명
https://stackoverflow.com/questions/11987838/which-form-of-connection-to-use-with-pika