[RxPy] 디버깅, 오류 처리하기
안녕하세요! 운동하는 개발자 Jay입니다!
오늘은 RxPy의 디버깅과 오류 처리에 대해서 알아보겠습니다.
1. do() 연산자로 디버깅하기 🐛
RxPy의 유틸리티 연산자 중 do_action()이 있습니다. do_action()은 rx.core.operators.do에 _do_action()을 wrapping 하고 있죠! rx.core.operators.do 에는 do 관련된 함수들이 모여있습니다.
왜 다른 do 함수들은 wrapping이 안되어 있는지 모르겠네요ㅎㅎ - 개인적인 생각으로는 do_action()으로 다른 do 함수들의 동작을 다 할 수 있어서 그런 것 같아요.
어떤 함수들이 있는지 한 번 봐볼까요?
- _do_action() : on_next, on_error, on_complted 호출 후에 실행되는 동작을 정의합니다.
- do_after_next() : on_next 호출 후에 실행되는 동작을 정의합니다.
- do_on_subscribe() : subscribe(구독) 호출 후에 실행되는 동작을 정의합니다.
- do_on_dispose() : OnDispose 호출 후에 실행되는 동작을 정의합니다.
- do_on_terminate() : on_complted, on_error가 호출될때 실행되는 동작을 정의합니다.
- do_after_terminate() : on_complted, on_error 호출 후에 실행되는 동작을 정의합니다.
- do_finally() : on_complete, on_error, disposal이 호출 된 후에 실행되는 동작을 정의합니다.
자 그럼, 몇가지 예제를 보겠습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from rx import create
from rx.core.operators.do import do_on_terminate
def test_observable(observer, schedueler):
observer.on_next("1")
observer.on_next("2")
observer.on_next("3")
observer.on_completed()
source = create(test_observable)
do_on_terminate(
source,
lambda : print("source completed")
).subscribe(
lambda x: print(x)
)
|
do_on_terminate() 함수를 사용해서 on_complted가 호출되면 lambda함수를 호출하도록 해봤습니다.
결과를 확인해보겠습니다.
우리가 생각한 그대로 출력이 되었습니다!
아 그리고, 제가 do_on_terminate()를 연산자가 아닌 함수로 지칭한 건 wrapping되어 있지 않아서 pipe()에서 쓰지 못하기 때문에 함수라 지칭했습니다. (실제로 rx.operator 경로에 있지도 않음...불쌍...)
pipe()에서 사용하시려면 아래와 같이 wrapping 해서 사용하면 됩니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from rx import create
def test_observable(observer, schedueler):
observer.on_next("1")
observer.on_next("2")
observer.on_next("3")
observer.on_completed()
def do_on_terminate(on_terminate):
def _do_on_terminate(observer):
from rx.core.operators.do import do_on_terminate as __do_on_terminate
return __do_on_terminate(observer, on_terminate)
return _do_on_terminate
source = create(test_observable).pipe(
do_on_terminate(lambda : print("source completed"))
).subscribe(
lambda x: print(x)
)
|
간단하게 warpping 해서 쓸 수 있습니다. 결과는 동일하게 나옵니다!
실제로 rx.operator에 있는 do_action은 위처럼 wrapping 되어 있습니다.
이번에는 do_after_next() 함수를 사용해 보겠습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from rx import create
from rx.core.operators.do import do_after_next
def test_observable(observer, schedueler):
observer.on_next("1")
observer.on_next("2")
observer.on_next("3")
observer.on_completed()
source = create(test_observable)
do_after_next(
source,
lambda x: print("source next {}".format(x))
).subscribe(
lambda x: print(x)
)
|
on_next가 호출될 때마다 print 하는 동작입니다.
정확히 우리가 예상한 대로 출력되는 결과를 볼 수 있습니다.
이렇듯 do_action() 연산자 혹은 do 함수들을 사용하면 RxPy에서 디버깅을 할 수 있습니다. 🐛
(예를 들면 현재 동작하는 스레드의 이름을 알고 싶을 때)
2. 오류 처리하기 (retry, catch) ⚠️
Rx에는 여러 가지 오류 처리하는 연산자들이 있는데, retry(), retryWhen() 등등... RxPY에는 retry()만 있습니다 ㅎㅎ
(왜 지원을 안 하는지는 구글링을 하거나 RxPy 깃 헙에 이슈 등록해서 질문을 해야 할 것 같습니다.)
무튼 RxPy의 retry() 연산자는 이름 그대로 오류가 나는 경우 몇 번 다시 시도할지를 정하는 연산자입니다.
파라미터로는 count를 받는데 None인 경우 무한반복입니다!!
예제로 확인해보겠습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from rx import create
from rx.operators import retry
def test_observable(observer, schedueler):
observer.on_next("1")
assert 1 == 2
observer.on_next("2")
observer.on_next("3")
observer.on_completed()
source = create(test_observable).pipe(
retry(3)
).subscribe(
lambda x: print(x)
)
|
assert 1 == 2 로 에러를 발생시켰습니다.
예상되는 결과는 1이 retry count 만큼 출력되고 프로그램이 종료되어야 합니다.
오! 이번에도 예상대로 동작했네요 ㅋㅋㅋ
이번에는 catch() 연산자에 대해 알아보겠습니다. 이 연산자도 이름 그대로 에러에 대한 예외처리를 하는 연산자입니다.
catch()는 현재 처리 중인 observable에서 에러가 나는 경우 다른 observable로 이어서 처리할 수 있습니다.
코드로 보면 더 쉽게 알 수 있습니다! 예제 코드를 확인해보겠습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
from rx import create
from rx.operators import catch
def test_observable(observer, schedueler):
observer.on_next("1")
assert 1 == 2
observer.on_next("2")
observer.on_next("3")
observer.on_completed()
def test_catch(observer, schedueler):
observer.on_next("catch 1")
observer.on_next("catch 2")
observer.on_next("catch 3")
observer.on_completed()
catch_source = create(test_catch)
source = create(test_observable).pipe(
catch(catch_source)
).subscribe(
lambda x: print(x)
)
|
test_observable에서 오류가 나면 test_catch를 실행하도록 했습니다.
결과를 확인해 볼까요?
예~ 생각대로 잘 동작했습니다.
오류 연산자들을 잘 활용하면, 디버깅할 때도 사용할 수 있고 Rx에서 오류를 적절히 처리할 수 있겠네요!
오늘 준비한 내용은 여기까지입니다:D 그럼 오늘도 즐거운 코딩 하세요~