-
[RxPy] 디버깅, 오류 처리하기💻 프로그래밍/Python 2020. 7. 15. 17:52
안녕하세요! 운동하는 개발자 Jay입니다!
오늘은 RxPy의 디버깅과 오류 처리에 대해서 알아보겠습니다.
1. do() 연산자로 디버깅하기 🐛
RxPy의 유틸리티 연산자 중 do_action()이 있습니다. do_action()은 rx.core.operators.do에 _do_action()을 wrapping 하고 있죠! rx.core.operators.do 에는 do 관련된 함수들이 모여있습니다.
왜 다른 do 함수들은 wrapping이 안되어 있는지 모르겠네요ㅎㅎ - 개인적인 생각으로는 do_action()으로 다른 do 함수들의 동작을 다 할 수 있어서 그런 것 같아요.
어떤 함수들이 있는지 한 번 봐볼까요?
- _do_action() : on_next, on_error, on_complted 호출 후에 실행되는 동작을 정의합니다.
- do_after_next() : on_next 호출 후에 실행되는 동작을 정의합니다.
- do_on_subscribe() : subscribe(구독) 호출 후에 실행되는 동작을 정의합니다.
- do_on_dispose() : OnDispose 호출 후에 실행되는 동작을 정의합니다.
- do_on_terminate() : on_complted, on_error가 호출될때 실행되는 동작을 정의합니다.
- do_after_terminate() : on_complted, on_error 호출 후에 실행되는 동작을 정의합니다.
- do_finally() : on_complete, on_error, disposal이 호출 된 후에 실행되는 동작을 정의합니다.
자 그럼, 몇가지 예제를 보겠습니다.
123456789101112131415161718from rx import createfrom rx.core.operators.do import do_on_terminatedef test_observable(observer, schedueler):observer.on_next("1")observer.on_next("2")observer.on_next("3")observer.on_completed()source = create(test_observable)do_on_terminate(source,lambda : print("source completed")).subscribe(lambda x: print(x))do_on_terminate() 함수를 사용해서 on_complted가 호출되면 lambda함수를 호출하도록 해봤습니다.
결과를 확인해보겠습니다.
우리가 생각한 그대로 출력이 되었습니다!
아 그리고, 제가 do_on_terminate()를 연산자가 아닌 함수로 지칭한 건 wrapping되어 있지 않아서 pipe()에서 쓰지 못하기 때문에 함수라 지칭했습니다. (실제로 rx.operator 경로에 있지도 않음...불쌍...)
pipe()에서 사용하시려면 아래와 같이 wrapping 해서 사용하면 됩니다.
12345678910111213141516171819202122232425from rx import createdef test_observable(observer, schedueler):observer.on_next("1")observer.on_next("2")observer.on_next("3")observer.on_completed()def do_on_terminate(on_terminate):def _do_on_terminate(observer):from rx.core.operators.do import do_on_terminate as __do_on_terminatereturn __do_on_terminate(observer, on_terminate)return _do_on_terminatesource = create(test_observable).pipe(do_on_terminate(lambda : print("source completed"))).subscribe(lambda x: print(x))간단하게 warpping 해서 쓸 수 있습니다. 결과는 동일하게 나옵니다!
실제로 rx.operator에 있는 do_action은 위처럼 wrapping 되어 있습니다.
이번에는 do_after_next() 함수를 사용해 보겠습니다.
12345678910111213141516171819from rx import createfrom rx.core.operators.do import do_after_nextdef test_observable(observer, schedueler):observer.on_next("1")observer.on_next("2")observer.on_next("3")observer.on_completed()source = create(test_observable)do_after_next(source,lambda x: print("source next {}".format(x))).subscribe(lambda x: print(x))on_next가 호출될 때마다 print 하는 동작입니다.
정확히 우리가 예상한 대로 출력되는 결과를 볼 수 있습니다.
이렇듯 do_action() 연산자 혹은 do 함수들을 사용하면 RxPy에서 디버깅을 할 수 있습니다. 🐛
(예를 들면 현재 동작하는 스레드의 이름을 알고 싶을 때)
2. 오류 처리하기 (retry, catch) ⚠️
Rx에는 여러 가지 오류 처리하는 연산자들이 있는데, retry(), retryWhen() 등등... RxPY에는 retry()만 있습니다 ㅎㅎ
(왜 지원을 안 하는지는 구글링을 하거나 RxPy 깃 헙에 이슈 등록해서 질문을 해야 할 것 같습니다.)
무튼 RxPy의 retry() 연산자는 이름 그대로 오류가 나는 경우 몇 번 다시 시도할지를 정하는 연산자입니다.
파라미터로는 count를 받는데 None인 경우 무한반복입니다!!
예제로 확인해보겠습니다.
123456789101112131415161718from rx import createfrom rx.operators import retrydef test_observable(observer, schedueler):observer.on_next("1")assert 1 == 2observer.on_next("2")observer.on_next("3")observer.on_completed()source = create(test_observable).pipe(retry(3)).subscribe(lambda x: print(x))assert 1 == 2 로 에러를 발생시켰습니다.
예상되는 결과는 1이 retry count 만큼 출력되고 프로그램이 종료되어야 합니다.
오! 이번에도 예상대로 동작했네요 ㅋㅋㅋ
이번에는 catch() 연산자에 대해 알아보겠습니다. 이 연산자도 이름 그대로 에러에 대한 예외처리를 하는 연산자입니다.
catch()는 현재 처리 중인 observable에서 에러가 나는 경우 다른 observable로 이어서 처리할 수 있습니다.
코드로 보면 더 쉽게 알 수 있습니다! 예제 코드를 확인해보겠습니다.
1234567891011121314151617181920212223242526from rx import createfrom rx.operators import catchdef test_observable(observer, schedueler):observer.on_next("1")assert 1 == 2observer.on_next("2")observer.on_next("3")observer.on_completed()def test_catch(observer, schedueler):observer.on_next("catch 1")observer.on_next("catch 2")observer.on_next("catch 3")observer.on_completed()catch_source = create(test_catch)source = create(test_observable).pipe(catch(catch_source)).subscribe(lambda x: print(x))test_observable에서 오류가 나면 test_catch를 실행하도록 했습니다.
결과를 확인해 볼까요?
예~ 생각대로 잘 동작했습니다.
오류 연산자들을 잘 활용하면, 디버깅할 때도 사용할 수 있고 Rx에서 오류를 적절히 처리할 수 있겠네요!
오늘 준비한 내용은 여기까지입니다:D 그럼 오늘도 즐거운 코딩 하세요~
'💻 프로그래밍 > Python' 카테고리의 다른 글
비동기로 Third-party API 처리하기 (feat. aiohttp, asyncio) (0) 2021.04.07 filter() + lambda VS list comprehension (2) 2020.08.10 [RxPy] CPU Concurrency와 Muliti Thread로 Rx하기 (1) 2020.07.14 [RxPy] Operator 응용과 Custom Operator 만들기(feat. 메서드 체이닝(Method Chaning)) (0) 2020.06.29 [RxPy] Reactive Programming? Rx? 그게뭐야? (2) 2020.06.23