💻 프로그래밍/Python

[RxPy] 디버깅, 오류 처리하기

피트웨어 제이 (FitwareJay) 2020. 7. 15. 17:52

안녕하세요! 운동하는 개발자 Jay입니다!

오늘은 RxPy의 디버깅과 오류 처리에 대해서 알아보겠습니다.

 

1. do() 연산자로 디버깅하기 🐛 


RxPy의 유틸리티 연산자 중 do_action()이 있습니다. do_action()rx.core.operators.do_do_action()을 wrapping 하고 있죠! rx.core.operators.do 에는 do 관련된 함수들이 모여있습니다.

 

왜 다른 do 함수들은 wrapping이 안되어 있는지 모르겠네요ㅎㅎ - 개인적인 생각으로는 do_action()으로 다른 do 함수들의 동작을 다 할 수 있어서 그런 것 같아요.

 

어떤 함수들이 있는지 한 번 봐볼까요?

  • _do_action() : on_next, on_error, on_complted 호출 후에 실행되는 동작을 정의합니다.
  • do_after_next() : on_next 호출 후에 실행되는 동작을 정의합니다.
  • do_on_subscribe() : subscribe(구독) 호출 후에 실행되는 동작을 정의합니다.
  • do_on_dispose() : OnDispose 호출 후에 실행되는 동작을 정의합니다.
  • do_on_terminate() : on_complted, on_error가 호출될때 실행되는 동작을 정의합니다.
  • do_after_terminate() : on_complted, on_error 호출 후에 실행되는 동작을 정의합니다. 
  • do_finally() : on_complete, on_error, disposal이 호출 된 후에 실행되는 동작을 정의합니다.

 

자 그럼, 몇가지 예제를 보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from rx import create
from rx.core.operators.do import do_on_terminate
 
def test_observable(observer, schedueler):
    observer.on_next("1")
    observer.on_next("2")
    observer.on_next("3")
    observer.on_completed()
    
 
source = create(test_observable)
do_on_terminate(
    source,
    lambda : print("source completed")
).subscribe(
    lambda x: print(x)
)
 
 

do_on_terminate() 함수를 사용해서 on_complted가 호출되면 lambda함수를 호출하도록 해봤습니다.

결과를 확인해보겠습니다.

 

on_complted 호출 후 출력된 문자열

우리가 생각한 그대로 출력이 되었습니다!

 

아 그리고, 제가 do_on_terminate()를 연산자가 아닌 함수로 지칭한 건 wrapping되어 있지 않아서 pipe()에서 쓰지 못하기 때문에 함수라 지칭했습니다. (실제로 rx.operator 경로에 있지도 않음...불쌍...)

 

pipe()에서 사용하시려면 아래와 같이 wrapping 해서 사용하면 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from rx import create
 
 
def test_observable(observer, schedueler):
    observer.on_next("1")
    observer.on_next("2")
    observer.on_next("3")
    observer.on_completed()
 
 
def do_on_terminate(on_terminate):
    def _do_on_terminate(observer):
        from rx.core.operators.do import do_on_terminate as __do_on_terminate
 
        return __do_on_terminate(observer, on_terminate)
 
    return _do_on_terminate
 
 
source = create(test_observable).pipe(
    do_on_terminate(lambda : print("source completed"))
).subscribe(
    lambda x: print(x)
)
 
 

간단하게 warpping 해서 쓸 수 있습니다. 결과는 동일하게 나옵니다!

 

from rx.operators import do_action

실제로 rx.operator에 있는 do_action은 위처럼 wrapping 되어 있습니다.

이번에는 do_after_next() 함수를 사용해 보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from rx import create
from rx.core.operators.do import do_after_next
 
 
def test_observable(observer, schedueler):
    observer.on_next("1")
    observer.on_next("2")
    observer.on_next("3")
    observer.on_completed()
 
 
source = create(test_observable)
do_after_next(
    source,
    lambda x: print("source next {}".format(x))
).subscribe(
    lambda x: print(x)
)
 
 

on_next가 호출될 때마다 print 하는 동작입니다.

 

on_next 호출후 출력된 string

정확히 우리가 예상한 대로 출력되는 결과를 볼 수 있습니다.

이렇듯 do_action() 연산자 혹은 do 함수들을 사용하면 RxPy에서 디버깅을 할 수 있습니다. 🐛

(예를 들면 현재 동작하는 스레드의 이름을 알고 싶을 때)

 

 

2. 오류 처리하기 (retry, catch) ⚠️ 


Rx에는 여러 가지 오류 처리하는 연산자들이 있는데, retry(), retryWhen() 등등... RxPY에는 retry()만 있습니다 ㅎㅎ

(왜 지원을 안 하는지는 구글링을 하거나 RxPy 깃 헙에 이슈 등록해서 질문을 해야 할 것 같습니다.)

 

무튼 RxPy의 retry() 연산자는 이름 그대로 오류가 나는 경우 몇 번 다시 시도할지를 정하는 연산자입니다.

 

retry() 내부 로직

파라미터로는 count를 받는데 None인 경우 무한반복입니다!!

예제로 확인해보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from rx import create
from rx.operators import retry
 
 
def test_observable(observer, schedueler):
    observer.on_next("1")
    assert 1 == 2
    observer.on_next("2")
    observer.on_next("3")
    observer.on_completed()
 
 
source = create(test_observable).pipe(
    retry(3)
).subscribe(
    lambda x: print(x)
)
 
 

assert 1 == 2 로 에러를 발생시켰습니다. 

예상되는 결과는 1이 retry count 만큼 출력되고 프로그램이 종료되어야 합니다.

 

3번 retry 후 종료

오! 이번에도 예상대로 동작했네요 ㅋㅋㅋ

이번에는 catch() 연산자에 대해 알아보겠습니다. 이 연산자도 이름 그대로 에러에 대한 예외처리를 하는 연산자입니다.

catch()는 현재 처리 중인 observable에서 에러가 나는 경우 다른 observable로 이어서 처리할 수 있습니다.

 

catch() 동작 예시

코드로 보면 더 쉽게 알 수 있습니다! 예제 코드를 확인해보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from rx import create
from rx.operators import catch
 
 
def test_observable(observer, schedueler):
    observer.on_next("1")
    assert 1 == 2
    observer.on_next("2")
    observer.on_next("3")
    observer.on_completed()
 
 
def test_catch(observer, schedueler):
    observer.on_next("catch 1")
    observer.on_next("catch 2")
    observer.on_next("catch 3")
    observer.on_completed()
 
 
catch_source = create(test_catch)
source = create(test_observable).pipe(
    catch(catch_source)
).subscribe(
    lambda x: print(x)
)
 
 

test_observable에서 오류가 나면 test_catch를 실행하도록 했습니다.

결과를 확인해 볼까요?

 

test_catch 옵저버블로 프로그램이 마무리

예~ 생각대로 잘 동작했습니다.

오류 연산자들을 잘 활용하면, 디버깅할 때도 사용할 수 있고 Rx에서 오류를 적절히 처리할 수 있겠네요!

오늘 준비한 내용은 여기까지입니다:D 그럼 오늘도 즐거운 코딩 하세요~