💻 프로그래밍/Python

[RxPy] Reactive Programming? Rx? 그게뭐야?

피트웨어 제이 (FitwareJay) 2020. 6. 23. 02:30

 

안녕하세요! 개발자 JAY 입니다! 오늘은 Reactive Programming 그리고 Rx에 대해서 이야기 해보겠습니다.

 

처음 Rx에 대해서 들어본 건 iOS 개발자 동료들이 RxSwift를 맨날 이야기하길래 ㅋㅋ 궁금해서 물어봤더니 비동기 프로그래밍 관련된 내용이라고 하더라고요! 그리고 요즘 유행(?)이라고 ㅋㅋㅋ

 

그래서 "아~그런가 보다" 하고 지나가다가 회사 안드로이드 개발자분이 Rx스터디를 열어서 바로 한다고 했습니다.

(평소 Rx가 궁금하긴 했지만 의지박약이라 실천에 못 옮겼었는데.. 매우 잘.. 되었습니다 ㅋㅋ)

 

 

1. What is Reactive Programming? 🤔


먼저 Reactive Progrming에 대해서 간단히 알아보겠습니다. 늘 그렇듯 위키피디아 를 참고하였습니다.

In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.


Reactie Programming
(반응형 프로그래밍) 은 데이터 스트림변화의 전달(데이터 스트림에서 값이 변했을 때  전달이 이루어지는 것)에 대한 선언적 프로그래밍 패러다임이다.

 

흠.... 사실 이렇게만 보면 잘 모르겠는데, 좀 더 위키에 있는 내용을 코드로 설명하자면,

명령형 프로그래밍에서 a = b + c 를 실행하면 b + c 의 결과가 할당되고, b, c 의 값의 변화는 a에 영향을 주지 않습니다. 반면, 리액티브 프로그래밍에서는 b 와 c의 값이 변경될 때마다  a = b + c를 실행하지 않고 자동으로 업데이트되어 a의 값을 결정합니다.

 

그래서 요약하자면 Reactive Programming 은 데이터 스트림을 비동기적인 흐름으로 처리하는 프로그래밍(?)이라고 볼 수 있을 것 같습니다. 

 

 

2. 그럼 Rx는 뭐지? 🤷‍♂️


RxReactiveX의 줄임말로 Rx홈페이지에 가면 이렇게 나와있네요.

An API for asynchronous programming with observable streams.


(observable streams이라고 하는데 Observer pattern 기반인 것 같습니다. 관련 여기서 확인해주세요!)

 

marble diagrams (출처: ReactrivX)

블라블라 영어로 뭐라 쓰여있지만, 우리는 Rx가 비동기 프로그래밍을 위한 API 정도로만 알면 될 것 같습니다! 

여기서 Observer pattern까지 알면 좋지만, 이건 따로 공부하기로 다음으로~

 

 

3. RxPy 사용하기 🧑‍💻


그래서 우리는 비동기 프로그래밍을 위해서 Rx를 써야 한다고 이해했습니다. 뭐 기존에도 비동기 프로그래밍을 할 수 없었던 것은 아니죠! callback으로 하던 여러 가지가 있는데 Rx는 비동기 프로그래밍을 더 간단하게 할 수 있다는 것 같아요 (연산자도 많고, 지원하는 언어도 많아서)

 

그리고 Rx는 지원하는 언어가 정말 많아요! RxJava, RxPy(python), RxScala 등등 그래서 Rx에 대한 기본 개념만 있으면 어떤 언어에서든 비슷하게 사용할 수 있습니다. (지원하는 언어들)

 

저는 python을 주로 사용하기 때문에 RxPy를  사용하겠습니다.

 

 기본 개념 설명

Observable과 Observer 관계

(이미지 출처: github)

1. Observable : 데이터 스트림을 생성하는 객체

2. Observer: 데이터 스트림을 구독하여 사용하는 객체

Observable은 onNext, onCompleted(언어별로 약간 다름), onError를 통해 Observer에게 무언가를 전달합니다.

 

- onNext : observable은 item을 방출할 때 onNext를 call 합니다. onNext는 observable이 방출한 아이템을 parameter로 사용합니다.

- onCompleted: 에러가 없는 경우 마지막 onNext를 호출하고 난 다음 이 메서드를 call 합니다.

- onError: 예상한 데이터를 생성 못했거나 오류가 발생했을 때 call 됩니다. onError는 onNext 또는 onCompleted를 더 이상 호출하지 않습니다.

 

기본적인 내용은 이렇습니다! 역시 개발자는 코드로 확인해봐야겠죠?!😀

pip3 install rx


rx를 설치해줍니다.

Rx 연산자들 (출처: ReactiveX)

rx의 연산자들은 위 이미지처럼 나와있습니다. 일단 처음으로 Create를 이용해서 observable을 만들어 보겠습니다.

1
2
3
4
5
6
7
8
9
10
from rx import create
 
 
def hello_rx(observer, sheduler):
    observer.on_next("Hello")
    observer.on_next("RxKidsnote")
    observer.on_next("everyone!!")
    observer.on_completed()
 
source = create(hello_rx)
 

hello_rx라는 observable을 만들었습니다. create에서는 함수를 파라미터로 받는데, 함수(hello_rx)의 기본 파라미터로 observer, scheduler를 받습니다. scheduler는 멀티스레딩을 적용하기 위한 파라미터입니다. 
(일단 이번 포스팅에서는 이렇게만 알고 넘어가고, 자세한 걸 알고 싶으신 분들은 여기를 참고해 주세요!)

 

hello_rx의 내부를 보면 on_next로 string을 받아옵니다. 그리고 on_next가 끝나면 on_completed로 끝을 알려줍니다.

on_next 내부  로직

궁금해서 한번 라이브러리 들어가 봤는데, 뭐 이런 식으로 생겼네요.

이상태에서 코드를 실행하면 어떻게 될까요?! 

아무알도 일어나지 않았다고 한다...

네 소름 돋을 정도로 아무 일도 일어나지 않았습니다.

왜냐하면 구독자가 없기 때문이죠! 기본적으로, observeble은 Subscribe 메소드가 호출(구독)돼야지 observer에게 알림을 보냅니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from rx import create
 
 
def hello_rx(observer, scheduler):
    observer.on_next("Hello")
    observer.on_next("RxPy")
    observer.on_next("WAWAWA!!")
    observer.on_completed()
 
 
def str_double(_str: str):
    print("{} {}".format(_str, _str))
 
 
source = create(hello_rx)
source.subscribe(lambda x: print(x))
source.subscribe(str_double)
 

자 이제 구독을 해보겠습니다. 일단 저는 두 가지 구독자(?)를 만들어서 구독을 했습니다. lamda로 만든 리턴 값을 출력해주는 함수와 str_double(작명 센스 ㅆㅎㅌㅊ)라는 리턴되는 string을 두 번 출력하는 함수 이 두 가지입니다.

과연 결과는 어떨까요?

RxPy로 실행한 결과

오.. 제대로 호출이 되네요! 다만 비동기 프로그래밍인데 왜 절차적으로 출력되었을까? 의문이신 분들이 있을 겁니다. 그  이유는 바로 멀티스레드로 실행하지 않았기 때문이에요!! ㅎㅎ 이 부분은 다음 포스팅에서 다루도록 하겠습니다.

 

그냥 여기서 끝내기는 아쉬우니까 다른 연산자를 하나 더 사용해보겠습니다.

range라는  연산자를 사용해 볼게요. range는 python의 range와 같이 범위 내 integer값을 순차적으로 내보내는 연산자입니다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from rx import create, range as rx_range
 
 
def rx_create_range(observer, scheduler):
    for idx in range(05):
        observer.on_next(idx)
    observer.on_completed()
 
 
# rx-create와 range로 구현한 rx-range
source = create(rx_create_range)
source.subscribe(lambda x: print(x))
 
# rx-range
rx_range = rx_range(05)
rx_range.subscribe(lambda x: print(x))
 
 
 

create로 구현한 range 연산자(스터디 숙제 ㅋㅋ) 와 range 연산자로 숫자를 출력하는 코드입니다.

순서대로 출력된 정수들

제대로 출력되네요! 다른 연산자들이 궁금하시다면 홈페이지를 참고하여 한번 해보시면 좋을 것 같습니다!

 

 

4. 마치며 👋


제가 지식이 부족해서 제대로 내용을 전달했는지 모르겠네요ㅎㅎ(혹시 설명이 잘못된 부분이 있다면 댓글로 남겨주시면 감사하겠습니다🙏)일단 간단히 Reactive ProgrammingRx에 대해서  알아봤는데요. 저는 backend에서 Rx를 어떻게 응용할 수 있을지 고민을 좀 해봐야 할 것 같습니다.

 

현재 저희 회사에서는 celery로 비동기 처리를 하고 있어서, RxPy를 적용한다면 어느 부분에 적용할 수 있을지 고민이네요.

확실히 클라이언트  단에서는 여러 api가 호출되는 동안 유저에게 멈춘 화면이 아닌 다른 화면을 보여줘야 하기 때문에 비동기 처리 부분에서 Rx가 도움이 많이 될 것 같아요.

 

무튼, 다음에는 멀티스레드 관련된 내용 혹은 Hot, Cold Observables에 대해 설명해보겠습니다.

오늘도 재밌는 코딩 하세요:D