ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [RxPy] Reactive Programming? Rx? 그게뭐야?
    💻 프로그래밍/Python 2020. 6. 23. 02:30

     

    안녕하세요! 개발자 JAY 입니다! 오늘은 Reactive Programming 그리고 Rx에 대해서 이야기 해보겠습니다.

     

    처음 Rx에 대해서 들어본 건 iOS 개발자 동료들이 RxSwift를 맨날 이야기하길래 ㅋㅋ 궁금해서 물어봤더니 비동기 프로그래밍 관련된 내용이라고 하더라고요! 그리고 요즘 유행(?)이라고 ㅋㅋㅋ

     

    그래서 "아~그런가 보다" 하고 지나가다가 회사 안드로이드 개발자분이 Rx스터디를 열어서 바로 한다고 했습니다.

    (평소 Rx가 궁금하긴 했지만 의지박약이라 실천에 못 옮겼었는데.. 매우 잘.. 되었습니다 ㅋㅋ)

     

     

    1. What is Reactive Programming? 🤔


    먼저 Reactive Progrming에 대해서 간단히 알아보겠습니다. 늘 그렇듯 위키피디아 를 참고하였습니다.

    In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.


    Reactie Programming
    (반응형 프로그래밍) 은 데이터 스트림변화의 전달(데이터 스트림에서 값이 변했을 때  전달이 이루어지는 것)에 대한 선언적 프로그래밍 패러다임이다.

     

    흠.... 사실 이렇게만 보면 잘 모르겠는데, 좀 더 위키에 있는 내용을 코드로 설명하자면,

    명령형 프로그래밍에서 a = b + c 를 실행하면 b + c 의 결과가 할당되고, b, c 의 값의 변화는 a에 영향을 주지 않습니다. 반면, 리액티브 프로그래밍에서는 b 와 c의 값이 변경될 때마다  a = b + c를 실행하지 않고 자동으로 업데이트되어 a의 값을 결정합니다.

     

    그래서 요약하자면 Reactive Programming 은 데이터 스트림을 비동기적인 흐름으로 처리하는 프로그래밍(?)이라고 볼 수 있을 것 같습니다. 

     

     

    2. 그럼 Rx는 뭐지? 🤷‍♂️


    RxReactiveX의 줄임말로 Rx홈페이지에 가면 이렇게 나와있네요.

    An API for asynchronous programming with observable streams.


    (observable streams이라고 하는데 Observer pattern 기반인 것 같습니다. 관련 여기서 확인해주세요!)

     

    marble diagrams (출처: ReactrivX)

    블라블라 영어로 뭐라 쓰여있지만, 우리는 Rx가 비동기 프로그래밍을 위한 API 정도로만 알면 될 것 같습니다! 

    여기서 Observer pattern까지 알면 좋지만, 이건 따로 공부하기로 다음으로~

     

     

    3. RxPy 사용하기 🧑‍💻


    그래서 우리는 비동기 프로그래밍을 위해서 Rx를 써야 한다고 이해했습니다. 뭐 기존에도 비동기 프로그래밍을 할 수 없었던 것은 아니죠! callback으로 하던 여러 가지가 있는데 Rx는 비동기 프로그래밍을 더 간단하게 할 수 있다는 것 같아요 (연산자도 많고, 지원하는 언어도 많아서)

     

    그리고 Rx는 지원하는 언어가 정말 많아요! RxJava, RxPy(python), RxScala 등등 그래서 Rx에 대한 기본 개념만 있으면 어떤 언어에서든 비슷하게 사용할 수 있습니다. (지원하는 언어들)

     

    저는 python을 주로 사용하기 때문에 RxPy를  사용하겠습니다.

     

     기본 개념 설명

    Observable과 Observer 관계

    (이미지 출처: github)

    1. Observable : 데이터 스트림을 생성하는 객체

    2. Observer: 데이터 스트림을 구독하여 사용하는 객체

    Observable은 onNext, onCompleted(언어별로 약간 다름), onError를 통해 Observer에게 무언가를 전달합니다.

     

    - onNext : observable은 item을 방출할 때 onNext를 call 합니다. onNext는 observable이 방출한 아이템을 parameter로 사용합니다.

    - onCompleted: 에러가 없는 경우 마지막 onNext를 호출하고 난 다음 이 메서드를 call 합니다.

    - onError: 예상한 데이터를 생성 못했거나 오류가 발생했을 때 call 됩니다. onError는 onNext 또는 onCompleted를 더 이상 호출하지 않습니다.

     

    기본적인 내용은 이렇습니다! 역시 개발자는 코드로 확인해봐야겠죠?!😀

    pip3 install rx


    rx를 설치해줍니다.

    Rx 연산자들 (출처: ReactiveX)

    rx의 연산자들은 위 이미지처럼 나와있습니다. 일단 처음으로 Create를 이용해서 observable을 만들어 보겠습니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    from rx import create
     
     
    def hello_rx(observer, sheduler):
        observer.on_next("Hello")
        observer.on_next("RxKidsnote")
        observer.on_next("everyone!!")
        observer.on_completed()
     
    source = create(hello_rx)
     

    hello_rx라는 observable을 만들었습니다. create에서는 함수를 파라미터로 받는데, 함수(hello_rx)의 기본 파라미터로 observer, scheduler를 받습니다. scheduler는 멀티스레딩을 적용하기 위한 파라미터입니다. 
    (일단 이번 포스팅에서는 이렇게만 알고 넘어가고, 자세한 걸 알고 싶으신 분들은 여기를 참고해 주세요!)

     

    hello_rx의 내부를 보면 on_next로 string을 받아옵니다. 그리고 on_next가 끝나면 on_completed로 끝을 알려줍니다.

    on_next 내부  로직

    궁금해서 한번 라이브러리 들어가 봤는데, 뭐 이런 식으로 생겼네요.

    이상태에서 코드를 실행하면 어떻게 될까요?! 

    아무알도 일어나지 않았다고 한다...

    네 소름 돋을 정도로 아무 일도 일어나지 않았습니다.

    왜냐하면 구독자가 없기 때문이죠! 기본적으로, observeble은 Subscribe 메소드가 호출(구독)돼야지 observer에게 알림을 보냅니다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    from rx import create
     
     
    def hello_rx(observer, scheduler):
        observer.on_next("Hello")
        observer.on_next("RxPy")
        observer.on_next("WAWAWA!!")
        observer.on_completed()
     
     
    def str_double(_str: str):
        print("{} {}".format(_str, _str))
     
     
    source = create(hello_rx)
    source.subscribe(lambda x: print(x))
    source.subscribe(str_double)
     

    자 이제 구독을 해보겠습니다. 일단 저는 두 가지 구독자(?)를 만들어서 구독을 했습니다. lamda로 만든 리턴 값을 출력해주는 함수와 str_double(작명 센스 ㅆㅎㅌㅊ)라는 리턴되는 string을 두 번 출력하는 함수 이 두 가지입니다.

    과연 결과는 어떨까요?

    RxPy로 실행한 결과

    오.. 제대로 호출이 되네요! 다만 비동기 프로그래밍인데 왜 절차적으로 출력되었을까? 의문이신 분들이 있을 겁니다. 그  이유는 바로 멀티스레드로 실행하지 않았기 때문이에요!! ㅎㅎ 이 부분은 다음 포스팅에서 다루도록 하겠습니다.

     

    그냥 여기서 끝내기는 아쉬우니까 다른 연산자를 하나 더 사용해보겠습니다.

    range라는  연산자를 사용해 볼게요. range는 python의 range와 같이 범위 내 integer값을 순차적으로 내보내는 연산자입니다.

     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    from rx import create, range as rx_range
     
     
    def rx_create_range(observer, scheduler):
        for idx in range(05):
            observer.on_next(idx)
        observer.on_completed()
     
     
    # rx-create와 range로 구현한 rx-range
    source = create(rx_create_range)
    source.subscribe(lambda x: print(x))
     
    # rx-range
    rx_range = rx_range(05)
    rx_range.subscribe(lambda x: print(x))
     
     
     

    create로 구현한 range 연산자(스터디 숙제 ㅋㅋ) 와 range 연산자로 숫자를 출력하는 코드입니다.

    순서대로 출력된 정수들

    제대로 출력되네요! 다른 연산자들이 궁금하시다면 홈페이지를 참고하여 한번 해보시면 좋을 것 같습니다!

     

     

    4. 마치며 👋


    제가 지식이 부족해서 제대로 내용을 전달했는지 모르겠네요ㅎㅎ(혹시 설명이 잘못된 부분이 있다면 댓글로 남겨주시면 감사하겠습니다🙏)일단 간단히 Reactive ProgrammingRx에 대해서  알아봤는데요. 저는 backend에서 Rx를 어떻게 응용할 수 있을지 고민을 좀 해봐야 할 것 같습니다.

     

    현재 저희 회사에서는 celery로 비동기 처리를 하고 있어서, RxPy를 적용한다면 어느 부분에 적용할 수 있을지 고민이네요.

    확실히 클라이언트  단에서는 여러 api가 호출되는 동안 유저에게 멈춘 화면이 아닌 다른 화면을 보여줘야 하기 때문에 비동기 처리 부분에서 Rx가 도움이 많이 될 것 같아요.

     

    무튼, 다음에는 멀티스레드 관련된 내용 혹은 Hot, Cold Observables에 대해 설명해보겠습니다.

    오늘도 재밌는 코딩 하세요:D

    댓글

운동하는 개발자 JAY-JI