ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [RxPy] 디버깅, 오류 처리하기
    💻 프로그래밍/Python 2020. 7. 15. 17:52

    안녕하세요! 운동하는 개발자 Jay입니다!

    오늘은 RxPy의 디버깅과 오류 처리에 대해서 알아보겠습니다.

     

    1. do() 연산자로 디버깅하기 🐛 


    RxPy의 유틸리티 연산자 중 do_action()이 있습니다. do_action()rx.core.operators.do_do_action()을 wrapping 하고 있죠! rx.core.operators.do 에는 do 관련된 함수들이 모여있습니다.

     

    왜 다른 do 함수들은 wrapping이 안되어 있는지 모르겠네요ㅎㅎ - 개인적인 생각으로는 do_action()으로 다른 do 함수들의 동작을 다 할 수 있어서 그런 것 같아요.

     

    어떤 함수들이 있는지 한 번 봐볼까요?

    • _do_action() : on_next, on_error, on_complted 호출 후에 실행되는 동작을 정의합니다.
    • do_after_next() : on_next 호출 후에 실행되는 동작을 정의합니다.
    • do_on_subscribe() : subscribe(구독) 호출 후에 실행되는 동작을 정의합니다.
    • do_on_dispose() : OnDispose 호출 후에 실행되는 동작을 정의합니다.
    • do_on_terminate() : on_complted, on_error가 호출될때 실행되는 동작을 정의합니다.
    • do_after_terminate() : on_complted, on_error 호출 후에 실행되는 동작을 정의합니다. 
    • do_finally() : on_complete, on_error, disposal이 호출 된 후에 실행되는 동작을 정의합니다.

     

    자 그럼, 몇가지 예제를 보겠습니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from rx import create
    from rx.core.operators.do import do_on_terminate
     
    def test_observable(observer, schedueler):
        observer.on_next("1")
        observer.on_next("2")
        observer.on_next("3")
        observer.on_completed()
        
     
    source = create(test_observable)
    do_on_terminate(
        source,
        lambda : print("source completed")
    ).subscribe(
        lambda x: print(x)
    )
     
     

    do_on_terminate() 함수를 사용해서 on_complted가 호출되면 lambda함수를 호출하도록 해봤습니다.

    결과를 확인해보겠습니다.

     

    on_complted 호출 후 출력된 문자열

    우리가 생각한 그대로 출력이 되었습니다!

     

    아 그리고, 제가 do_on_terminate()를 연산자가 아닌 함수로 지칭한 건 wrapping되어 있지 않아서 pipe()에서 쓰지 못하기 때문에 함수라 지칭했습니다. (실제로 rx.operator 경로에 있지도 않음...불쌍...)

     

    pipe()에서 사용하시려면 아래와 같이 wrapping 해서 사용하면 됩니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    from rx import create
     
     
    def test_observable(observer, schedueler):
        observer.on_next("1")
        observer.on_next("2")
        observer.on_next("3")
        observer.on_completed()
     
     
    def do_on_terminate(on_terminate):
        def _do_on_terminate(observer):
            from rx.core.operators.do import do_on_terminate as __do_on_terminate
     
            return __do_on_terminate(observer, on_terminate)
     
        return _do_on_terminate
     
     
    source = create(test_observable).pipe(
        do_on_terminate(lambda : print("source completed"))
    ).subscribe(
        lambda x: print(x)
    )
     
     

    간단하게 warpping 해서 쓸 수 있습니다. 결과는 동일하게 나옵니다!

     

    from rx.operators import do_action

    실제로 rx.operator에 있는 do_action은 위처럼 wrapping 되어 있습니다.

    이번에는 do_after_next() 함수를 사용해 보겠습니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    from rx import create
    from rx.core.operators.do import do_after_next
     
     
    def test_observable(observer, schedueler):
        observer.on_next("1")
        observer.on_next("2")
        observer.on_next("3")
        observer.on_completed()
     
     
    source = create(test_observable)
    do_after_next(
        source,
        lambda x: print("source next {}".format(x))
    ).subscribe(
        lambda x: print(x)
    )
     
     

    on_next가 호출될 때마다 print 하는 동작입니다.

     

    on_next 호출후 출력된 string

    정확히 우리가 예상한 대로 출력되는 결과를 볼 수 있습니다.

    이렇듯 do_action() 연산자 혹은 do 함수들을 사용하면 RxPy에서 디버깅을 할 수 있습니다. 🐛

    (예를 들면 현재 동작하는 스레드의 이름을 알고 싶을 때)

     

     

    2. 오류 처리하기 (retry, catch) ⚠️ 


    Rx에는 여러 가지 오류 처리하는 연산자들이 있는데, retry(), retryWhen() 등등... RxPY에는 retry()만 있습니다 ㅎㅎ

    (왜 지원을 안 하는지는 구글링을 하거나 RxPy 깃 헙에 이슈 등록해서 질문을 해야 할 것 같습니다.)

     

    무튼 RxPy의 retry() 연산자는 이름 그대로 오류가 나는 경우 몇 번 다시 시도할지를 정하는 연산자입니다.

     

    retry() 내부 로직

    파라미터로는 count를 받는데 None인 경우 무한반복입니다!!

    예제로 확인해보겠습니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    from rx import create
    from rx.operators import retry
     
     
    def test_observable(observer, schedueler):
        observer.on_next("1")
        assert 1 == 2
        observer.on_next("2")
        observer.on_next("3")
        observer.on_completed()
     
     
    source = create(test_observable).pipe(
        retry(3)
    ).subscribe(
        lambda x: print(x)
    )
     
     

    assert 1 == 2 로 에러를 발생시켰습니다. 

    예상되는 결과는 1이 retry count 만큼 출력되고 프로그램이 종료되어야 합니다.

     

    3번 retry 후 종료

    오! 이번에도 예상대로 동작했네요 ㅋㅋㅋ

    이번에는 catch() 연산자에 대해 알아보겠습니다. 이 연산자도 이름 그대로 에러에 대한 예외처리를 하는 연산자입니다.

    catch()는 현재 처리 중인 observable에서 에러가 나는 경우 다른 observable로 이어서 처리할 수 있습니다.

     

    catch() 동작 예시

    코드로 보면 더 쉽게 알 수 있습니다! 예제 코드를 확인해보겠습니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    from rx import create
    from rx.operators import catch
     
     
    def test_observable(observer, schedueler):
        observer.on_next("1")
        assert 1 == 2
        observer.on_next("2")
        observer.on_next("3")
        observer.on_completed()
     
     
    def test_catch(observer, schedueler):
        observer.on_next("catch 1")
        observer.on_next("catch 2")
        observer.on_next("catch 3")
        observer.on_completed()
     
     
    catch_source = create(test_catch)
    source = create(test_observable).pipe(
        catch(catch_source)
    ).subscribe(
        lambda x: print(x)
    )
     
     

    test_observable에서 오류가 나면 test_catch를 실행하도록 했습니다.

    결과를 확인해 볼까요?

     

    test_catch 옵저버블로 프로그램이 마무리

    예~ 생각대로 잘 동작했습니다.

    오류 연산자들을 잘 활용하면, 디버깅할 때도 사용할 수 있고 Rx에서 오류를 적절히 처리할 수 있겠네요!

    오늘 준비한 내용은 여기까지입니다:D 그럼 오늘도 즐거운 코딩 하세요~

    댓글 2

운동하는 개발자 JAY-JI