-
[RxPy] Operator 응용과 Custom Operator 만들기(feat. 메서드 체이닝(Method Chaning))💻 프로그래밍/Python 2020. 6. 29. 02:56
안녕하세요! 운동하는 개발자 JAY입니다. 오늘은 RxPy 연산자를 응용해서 몇 가지 문제를 풀어보도록 하겠습니다.
RxPy에 대한 기본 설명은 여기를 참고해주시기 바랍니다.
Rx에서 제공하는 연산자들이 궁금하다면 여기에서 확인해 주세요!
문제를 풀기전 메서드 체이닝에 대해서 알아보고 가겠습니다!
1. 메서드 체이닝? 그게 뭐입니까 휴먼? 🤔
일단 메서드 체이닝이란 method().filter().map() 이런 식으로 메서드를 연속적으로 이어서(체이닝) 쓸 수 있는 방식을 말합니다.
다만 RxPy에서는 메서드 체이닝을 지원하지 않습니다. RxJava 와같은 언어도요 ㅎㅎ
메서드 체이닝을 지원하지 않는 다면, 여러 Observable을 연속적으로 사용할 때 매우 비효율적으로 사용하게 됩니다.
예를 들면 map(filter(method())) 이렇게? ㅎㅎ 뭐 몇 가지 정도야 이렇게 써도 큰 상관은 없으나, 더 많아진다면....ㅎㅎ
대신 RxPy에서는 pipe라는 것을 이용해 multiple operators를 할 수 있도록 해줍니다.(RxJava에서는 Compose를 사용)
위 내용은 여기서 확인 가능합니다.
자 이제, 실제 문제에서 pipe와 여러 연산자들을 사용해보겠습니다!
2. 여러 연산자들을 활용하여 문제 풀어보기 ✍
📌 range(0, 100) 연산자를 사용하여 짝수면 EVEN 홀수면 ODD를 출력하는 변환 연산자를 만들어 보시오.
지난 포스팅에서는 create와 range연산자를 사용해봤습니다.
문제에서는 이 숫자가 짝수이면 EVEN 홀수면 ODD로(string)으로 출력하는 변환 연산자를 구현하라고 했습니다.
12345from rx import range as rx_rangerx_range(0, 100).subscribe(lambda x: print('ODD : {}'.format(x)) if x % 2 else print('EVEN : {}'.format(x)))간단하게 lambda에서 조건문을 사용해서 문제를 풀 수 있습니다.
근데 사실 이건 변환 연산자를 만든 게 아니라 range 연산자에서 주는 값을 단순히 구독했을 때 조건에 맞게 출력해주는 것입니다.
우리가 원하는 건 EVEN, ODD라는 string값을 받는 것입니다. 그래서 아래와 같이 해줘야 합니다.
1234567from rx import range as rx_range, operators as oprx_range(0, 100).pipe(op.map(lambda x: 'ODD : {}'.format(x) if x % 2 else 'EVEN : {}'.format(x))).subscribe(lambda x: print(x))처음에 이야기했던 multiple operators를 위해 pipe를 사용했습니다. 그리고 map 연산자를 이용해서 int형 데이터를 lambda 함수에 맞게 string으로 변환해서 리턴합니다. (map 연산자는 데이터를 다른 타입으로 변환해 주는 연산자입니다.)
디버그 찍어보니 이렇게 string으로 받는 것을 확인할 수 있습니다. 결과는 역시 제대로 나왔겠죠?!
📌 range(0, 100)을 받아서 제곱의 값이 10 초과, 4000 미만인 값만 방출하는 변환 연산자를 작성해 보시오.
이번에는 제곱 값이 10 초과, 4000 미만인 값만 방출하는 변환 연산자를 만들어야 합니다. 이럴 때 사용할 수 있는 연산자 중에 filter 연산자가 있습니다.
filter는 말 그대로 필터링을 해주는 연산자입니다.
1234567from rx import range as rx_range, operators as oprx_range(0, 100).pipe(op.filter(lambda x: 10 < pow(x, 2) < 4000)).subscribe(lambda x: print("{} - pow: {}".format(x, pow(x, 2))))정말 간단하죠? 이전 포스팅에서도 말했지만 Rx는 다양한 연산자를 제공해주고 이런 연산들의 data stream을 쉽게 확인할 수 있습니다.
비동기 프로그래밍할 때 데이터의 흐름을 쉽게 파악할 수 있다는 장점이 되겠죠?
(아직 실무에서 사용해보진 못했지만, 이런 문제로만 봐서도 어떻게 데이터의 입력이 마지막 방출까지 어떤 흐름으로 되는지 쉽게 파악이 가능해 보입니다.)
📌 1초마다 “Ping”과 “Pong”을 번갈아 출력하는 Observable을 만들어 보시오.
오 이번에는 조금 어려워 보이는 문제네요.
일단 1초마다 무언갈 방출해야 하는데, 이런 Observable이 있는지 한번 찾아보겠습니다.
오... 갓 Rx... 없는 게 없는!! ㅋㅋㅋ Interval은 특정 시간을 간격으로 interger를 방출합니다.
자, 그럼 우리는 1초마다 방출되는 정수를 이용해서 ping, pong을 출력하면 되겠죠?
12345from rx import operators as op, intervalinterval(1).pipe(op.map(lambda x: 'pong' if x % 2 else 'ping')).subscribe(lambda x: print(x))짝수, 홀수 문제에서 사용한 것처럼 조건문을 갖게 하면 1초마다 ping, pong을 번갈아 가면서 출력이 가능하겠죠?! 그리고 코드를 실행시켜보면?!
엇... 근데 결과가... 아무것도 안 나오네요. 코드가... 잘못된 걸까요??
사실 코드가 잘못된 건 아니고, intrerval은 결과를 방출하려고 하는데 프로그램이 바로 종료되어 버려서 결과를 확인할 수 없었던 것입니다. sleep을 넣어서 다시 한번 결과를 확인해 볼게요!
1234567from time import sleepfrom rx import operators as op, intervalinterval(1).pipe(op.map(lambda x: 'pong' if x % 2 else 'ping')).subscribe(lambda x: print(x))sleep(10)제대로 출력되는 것을 확인할 수 있습니다!! 꼭 저렇게 해야만 되는 건 아니고, 다른 연산자들을 활용해서도 똑같은 결과를 만들 수 있습니다!
📌 짝수번째 데이터만 방출하는 OnlyEven 함수를 만들어 보시오.
오... 짝수가 아니라 짝수번째 데이터만이라... 이것도 왠지 Rx에서 지원해줄 것 같아서 찾아보다가, fiflter 쪽에서 이런 것을 발견했습니다!
오.. 대충 설명만 봐도 index를 가지고 무얼 한다고 하는 것 같네요. 한번 이걸로 문제를 풀어보겠습니다.
1234567from rx import operators as op, from_test_list = ['A', 'B', 'C', 'D', 'E', 'F', 'G']from_(test_list).pipe(op.filter_indexed(lambda x, idx: idx % 2)).subscribe(lambda x: print(x))일단 from_(from_iterable) observable은 iterable 데이터를 순서대로 방출하는 observable입니다.
그리고 filter_indexed 연산자로 짝수번째 index인 데이터만 방출하도록 하였습니다.
자 이렇게, 짝수번째 데이터만 방출된 걸 확인할 수 있습니다.
📌 문자열 데이터를 입력받아서 한 글자씩 방출하는 데이터스트림으로 변환하는 to_char_stream() 메소드을 만들어 보시오.
이번 문제는 조금 어려운 문제라고 생각합니다. Custom Operator를 만들어야 하기 때문이죠.
사실 문자열을 한글자씩 방출하는 것은 위 문제에서 사용한 from_을 사용하면 가능합니다.
123from rx import from_from_('jay').subscribe(lambda x: print(x))하지만 우리는 직접 만들어야 합니다; ㅎㅎ 다행히 친절하게 문서에 Custom operator를 만드는 예제가 있었습니다.
Custom Operator에 대한 내용은 여기를 클릭하세요.
일단 operator를 저런 식으로 만들면 된다는 건 알았습니다. 예제와 비슷하게 한번 Custom Operator를 구현해보겠습니다.
123456789101112131415161718192021222324from rx import create, ofdef to_char_stream():def _to_char_stream(source):def subscribe(observer, scheduler):def on_next(value):for v in value:observer.on_next(v)return source.subscribe(on_next,observer.on_error,observer.on_completed,scheduler)return create(subscribe)return _to_char_streamof("Jay").pipe(to_char_stream()).subscribe(lambda value: print("{0}".format(value)))일단 설명을 하자면 of라는 observable은 from_과 같은 observable입니다.
라이브러리 찾아가 보면 둘 다 from_iterable이라는 observale을 반환합니다. 뭔가 이유가 있긴 할 텐데, 일단 pass...(별론 궁금하진 않으므로)
of(from)은 입력받은 값들을 순서대로 방출하는데, 생각해보니 문자열 하나만 받을 거니까 just를 사용하는 게 더 맞는 듯하네요;;ㅎㅎ
어찌 됐건 ㅎㅎ 문자열을 받아서 to_char_stream()이라는 custom operator를 pipe에 넣어주면 문자열을 받아서 on_next()를 통해 순서대로 문자를 하나씩 방출하게 됩니다.
근데, 전 여기서 왜 저렇게 Custom oprator를 꼭 만들어 줘야하는건가? 라는 의문점이 생겼습니다.
꼭 lowercase() 안에 _lowercase를 리턴해줘야 하는 것인가?
그래서 wrapper함수(to_char_stream)를 지우고 위 이미지처럼 해봤더니 에러가 나는 겁니다. ㅎㅎ
그래서 pipe 함수를 들어가 보니...
혹시 이제 이해가 되시나요??ㅋㅋ 이렇게 보고 이해되신다면 그대는 pythonic 하신 분...ㅋㅋ 농담이고요 ㅎㅎ
일단 Exmaples 보면 어느 정도 파악이 되는데 pipe안에는 값이 아닌 함수(정확히는 operator)를 넣어줘야 합니다.
그래서 위 예제에서 _to_char_stream을 반환하는 to_char_stream함수를 사용했던 것입니다.
그다음 pipe에는 compose 함수가 있습니다. 맞습니다! 이 compose가 RxJava에서 사용하는 compose 역할입니다 ㅎㅎ
python에서는 reduce 함수로 만들어주네요.
python의 functools.reduce 함수는 여러 개의 데이터를 대상으로 누적집계를 내고 싶을 때 사용합니다.
설명에 보시면 ((((1 + 2) +3) +4) +5) 이 부분이 메소드 체이닝을 나타내고 있는 것 같지 않나요? ㅋㅋ
뭐 이걸 의도해서 만든 함수인지는 모르겠지만, RxPy의 Pipe에서는 reduce를 사용해 compose를 구현했네요.
reduce(lambda obs, op: op(obs), operators, source)
그래서 위 구문을 보면 source들을 받아서 op(operator)에 넣어주고 있습니다. 여기서 가장 오른쪽 source는 초기값인데, 제가 만든 로직에서는 of겠네요.
쉽게 예제를 만들어서 pipe에서 받는 operator 형식(?)을 재현해봤습니다.
123456789101112131415161718from functools import reducedef add():def _add(a):return a + 1return _adddef _pipe(funcion):operators = funcionsource = 1return reduce(lambda obs, op: op(obs), [operators], source)result = _pipe(add())print(result)오 비슷하지 않나요?? 그래서 정리하자면 RxPy의 pipe에서는 함수 즉, operator를 받아야 하기 때문에 operator를 반환하는 함수로 wrpping을 했던 것입니다.
123456789101112131415161718def _to_char_stream(source):def subscribe(observer, scheduler):def on_next(value):for v in value:observer.on_next(v)return source.subscribe(on_next,observer.on_error,observer.on_completed,scheduler)return create(subscribe)just("Jay").pipe(_to_char_stream).subscribe(lambda value: print("{0}".format(value)))그래서 이렇게 wrapping을 안 하고 바로 operatior를 사용해도 됩니다.
하지만, 이렇게 사용 안 하고 문서에서 wrapping을 해서 custom operator를 만든 건 실제로 RxPy에서 사용하는 모든 operator들이 내부에서 operator를 반환하도록 wrraping한 함수이기 때문입니다.
그래서 결론은 처음처럼 wrapping 해서 사용해야 한다!입니다 ㅋㅋㅋ
아 뭔가 이야기가 딴 곳으로 샌 것 같지만, 하다가 그냥 궁금해서 찾아보다 보니 이렇게 됐네요 ㅋㅋㅋ
3. 마치며 👋
오늘은 RxPy의 연산자(Operator)들을 응용해보고 커스텀 연산자(Custom Operator)를 만들어서 사용해 봤는데요, 상당히 재밌었던 것 같습니다. 하다 보니 데이터 체이닝, 그리고 reduce까지 알게 되고 ㅋㅋㅋ
여러분들도 이 글을 통해 RxPy가 재밌어지셨으면 좋겠네요ㅋㅋ 오늘도 즐거운 코딩 하세요~~👨💻
'💻 프로그래밍 > Python' 카테고리의 다른 글
[RxPy] 디버깅, 오류 처리하기 (2) 2020.07.15 [RxPy] CPU Concurrency와 Muliti Thread로 Rx하기 (1) 2020.07.14 [RxPy] Reactive Programming? Rx? 그게뭐야? (2) 2020.06.23 [BitBar] 오픈소스 플러그인 내맘대로 만들어보기 (Mac OS 메뉴바 플러그인) (0) 2020.06.14 [Python]메타클래스 & 인스턴스, 클래스, 스태틱 메소드 개념 정리 (2) 2018.04.30