💻 프로그래밍/Python

비동기로 Third-party API 처리하기 (feat. aiohttp, asyncio)

피트웨어 제이 (FitwareJay) 2021. 4. 7. 16:13

안녕하세요! 운동하는 개발자 제이입니다! 오늘은 비동기로 Third-party API를 처리하는 방법, 그리고 제가 왜 비동기를 사용했는지에 대한 과정을 설명해보겠습니다.

 

제가 비동기에 관심을 갖게 된 이유는 현재 개발 중인 토이 프로젝트에서 Github API를 호출하기 때문입니다. 근데 단순히 1번 호출해서 사용하는 게 아니라 Github 사용자에 따라 10번 일수도 100번 일수도 있습니다.

 

현재 개발 중인 토이 프로젝트는 등록한 Github 유저의 정보를 가지고 API를 호출해서 정보를 보여주는 서비스입니다. 여기에 DB에 read, write 등의 작업들을 하게 되면 엄청난 시간이 걸리게 됩니다. 

 

지금까지 가장 오래 걸리는 user의 경우 43초 정도가 걸렸네요... 코드와 실행결과를 보면서 좀 더 자세하게 이야기해보겠습니다.

 

 

1. 문제의 코드 🤡


for repository in repositories:
    res = requests.get(repository.get('contributors_url'), headers=self.headers)
 
    if res.status_code == 451:
        # 레포지토리 접근 오류('Repository access blocked') - 저작원에 따라 block 될 수 있음
        continue
 
    if res.status_code != 200:
        self.update_fail(res)
 
    for contributor in json.loads(res.content):
        if self.github_user.username == contributor.get('login'):
            self.repositories.append(repository)
            break
 
cs

Github Repository API를 호출하게 되면 User가 가지고 있는 repository에 대한 정보를 반환합니다.

위 로직을 설명하자면 repositories에는 repository 정보들이 들어 있습니다. for loop에서는 각 repositorycontributor 중 본인이 속해있으면 repositoryself.repositories에 저장하는 로직입니다.

 

이 과정에서 매번 컨트리뷰터 정보가 있는 Github Contributor API를 호출하게됩니다. for loop의 횟수가 100번이면 100번 API를 호출하게 됩니다. 이 과정을 동기(sync)로 처리할 경우 엄청난 시간을 소비하게 됩니다. (저런 로직이 1개 더 있음)

 

그럼 실제로 이 로직이 있는 class를 실행해 보겠습니다.

github_information_service = GithubInformationService(username='깃헙아이디')
github_information_service.update()
 
cs

일단 요런 식으로 로직들을 실행하게 됩니다.

 

33초 걸림...ㅋㅋㅋ

33초나 걸렸습니다. 물론 위에 첨부한 로직 외에도 여러 로직이 있긴 한데... 너무 오래 걸리긴 하네요.

 

 

2. 문제 해결 🛠


사실 업데이트에 오래 걸리는 서비스(?) 이긴 하지만, 여기서 좀 더 시간을 단축하기 위해 비동기(async)로 처리해보겠습니다. 비동기 처리를 위해 python 3.4 (?)부터였나... 암튼 그때부터 지원하기 시작한 asyncio와 requests 모듈 말고 비동기 http를 처리하는 aiohttp를 사용하기로 했습니다.

 

asyncioaiohttp에 대한 세부 내용은 따로 글을 써보겠습니다. 이번 글에서는 간단한 설명과 실질적으로 얼마나 속도가 향상되었고, django에서 어떻게 사용되는지를 확인해보겠습니다.

 

📌 비동기 로직으로 수정

loop = asyncio.get_event_loop()
loop.run_until_complete(self.save_organization_repository(repositories))
 
cs

먼저 첫 번째 라인은 get_event_loop()는 현재 실행 중인 event loop를 가져옵니다. 그리고 run_until_complete()self.save_organization_repository()가 끝날 때까지 기다립니다. 좀 더 정확히 설명하자면 future라 불리는 instance의 실행이 끝날 때까지 기다립니다. future는 비동기 작업의 결과를 나타낸다고 합니다. 

 

어디서 봤는데, JS의 promise(약속)처럼 future는 미래의 실행될 결과(?) 뭐 이런 느낌이라서 future라고 네이밍 했다는 말이 있다고 하네요. 정확한 건 아니고 어디서 봤... 카더라 ~ ㅎㅎ

 

📌 async 함수와 aiohttp로 호출되는 Github API

 

async def append_repository(self, repository):  # 코루틴 정의
    async with aiohttp.ClientSession() as session:
        async with session.get(repository.get('contributors_url'), headers=self.headers) as res:
            response_text = await res.text()
 
            if res.status == 451:
                # 레포지토리 접근 오류('Repository access blocked') - 저작원에 따라 block 될 수 있음
                pass
            elif res.status != 200:
                # self.update_fail(res)
                pass
            else:
                for contributor in json.loads(response_text):
                    if self.github_user.username == contributor.get('login'):
                        self.repositories.append(repository)
                        break
 
async def save_organization_repository(self, repositories):
    futures = [asyncio.ensure_future(
        self.append_repository(repository)) for repository in repositories
    ]
 
    await asyncio.gather(*futures)
cs

self.save_organization_repository() 내부 로직을 살펴보겠습니다. 위에서 run_until_complete()future가 끝날 때까지 대기하는 함수라고 설명했는데,  ensure_future() 그런 future를 생성하는 함수입니다. 

 

for loop로 ensure_future()를 이용해 self.append_repository() -> future 인스턴스로 생성합니다. 한번 Debug 모드로 값을 확인해보겠습니다.

futures 리스트 디버그

잉? future라는 인스턴스들이 있을 줄 알았는데, Task라는 인스턴스들이 있네요?! 사실 async 관련 내용을 살펴보면 coroutine, Task, Future에 대한 내용이 있는데, 자세한 내용은 여기서 확인하면 좋을 것 같습니다.

 

Future를 상속한 Task

쉽게 Task는 Future의 Sub Class이고, 둘 다 awaitable 합니다. 일단 이 글에서는 둘 다 비슷한 개념이고 비동기를 위해 awaitable 한 객체라고 생각하면 될 것 같습니다.

 

다시 코드로 돌아와서, asyncio.gather()는 awaitable 한 객체를 실행합니다. asyncio 프로그램을 실행하기 위해 await를 사용합니다. 간단히 말하면 async 함수들을 실행한다는 의미입니다.

 

append_repository() 내부에서는 aiohttp로 Github api를 호출합니다. async를 앞에 선언하고 aiohttp.ClientSession()로 requests 모듈처럼 http 통신을 합니다.  그리고 get으로 정보를 가져와서 컨트리뷰터와 유저가 같으면 리스트에 저장하는 로직을 수행합니다.

(뭔가 자세한 내용을 건너뛰고 설명하려니 힘드네요 ㅠ)

 

여하튼 이렇게 asyncio, aiohttp를 사용해서 바뀐 로직이 동기로 처리한 로직보다 얼마나 빠른지 테스트해보겠습니다.

 

7초 걸렸다,...

7초가 걸렸습니다 ㅋㅋㅋㅋ 33초에서 7초... 거의 5배 가까이 속도가 향상되었네요.

 

async 짱짱맨

 

3. Django REST API에서 사용해보기 👨‍💻


스크립트 형식으로는 테스트해봤고 이제 Django REST API에 위 로직을 추가해보겠습니다.

def update(self, request, *args, **kwargs):
    username = self.kwargs.get(self.lookup_url_kwarg)
    response_data = {}
 
    try:
        github_user = GithubUser.objects.filter(username=username).get()
 
        # 업데이트 한지 하루가 지나야지 재업데이트
        if github_user.updated + timedelta(1>= datetime.now():
            response_data = self.serializer_class(github_user).data
            return Response(response_data)
 
        github_information_service = GithubInformationService(username)
        user = github_information_service.update()
        response_data = self.serializer_class(user).data
 
    except GithubUser.DoesNotExist:
        raise exceptions.NotFound
 
    except RateLimit:
        raise RateLimitGithubAPI()
 
    return Response(response_data)
cs

patch를 호출하면 업데이트를 실행하는 로직입니다. Postman으로 api를 호출해보겠습니다!!

 

응? 뭔가 이상한 오류가...

역시 한 번에 되는 게 없네요. 하지만 여러분! 모든 에러는 에러 메시지에 정답이 있습니다!

There is no current event loop in thread 'Thread-1'.

Thread-1에 이벤트 루프가 없다고 하네요?!  음... 우리 get_event_loop()로 event loop를 가져왔던 것 같은데 무슨 일 지이?!

 

바로 이번 글의 핵심입니다!! ㅋㅋㅋ 디버깅을 한번 찍어 보겠습니다. 일단 저 오류가 어디서 실행되는지 검색해보겠습니다.

get_event_loop() 내부에서 생기는 오류

음... 왜 그럼 아까 스크립트로 실행했을 때는 되고, django 내부에서 실행했을 때는 오류가 나는 걸까...?  생각을 해보다가 바로 위 로직을 한번 살펴보니 현재 스레드가 메인 스레드인 경우만 self.set_event_loop()로 새로운 event loop를 생성하네요. 엇... 그럼?!

 

좀 더 위로 올라가서 get_event_loop()가 있는 class의 설명을 확인해 보았습니다.

기본 이벤트 루프 정책 클래스...라고 한글로 읽기

음... 영어는 잘 못하지만 대~~ 충 살펴보면, 

이 정책에서 각 스레드는 한 가지 이벤트 루프를 갖는다. 그러나 우리는 오직 메인 스레드만 디폴트로 이벤트 루프를 자동으로 생성해준다. 다른 스레드는 디폴트로 이벤트 루프를 가지고 있지 않는다.

오... 근데 맞는 말인 게 djangomain thread로 실행되고 있고 내부에서 실행되는 async thread의 경우 main thread가 아니기 때문에 오류가 났던 거였습니다. 글로 설명되어있지만 전 실제로 봐야 하는 성격이라 디버깅을 찍어 보겠습니다.

 

django script 로 실행한 경우
django api server에서 api호출로 실행된 경우

오오... 정말 다르네요... 그럼 이경우 어떻게 해줘야 할까요? 새로운 스레드를 만들라고 했는데, 저기 보이는 self.set_event_loop()self.new_event_loop()를 이용하면 될 것 같네요. 사실 문서에도 나와있지만 눈치껏 저걸 이용하면 될 것 같다는 생각이 드네요.

 

 

4. 마치며 👋


회사에서 async를 사용해본 적이 없어서 뭔가 실질적으로 서비스 내에서 활용해 보고 싶었는데, 토이 프로젝트를 통해 해 볼 수 있어서 재밌었던 것 같네요.

 

python의 async는 GIL과 여러 가지 의미들을 파악 해야 해서 처음에는 조금 헷갈렸던 것 같습니다. 그 안에서 Future, Task, coroutine이라는 개념들도 있고요.

 

다음 포스팅에서는 asyncio에 대해서 좀 더 세부적으로 공부해보도록 하겠습니다. 

그럼 오늘도 즐거운 코딩 하세요~