Redux Saga - WebSocket(socket.io) 이벤트 처리


React(이하 리액트), Redux(이하 리덕스) 생태계에서 소켓과 같은 외부 이벤트 처리는 개발에 많은 고민을 안겨준다. 똑같은 외부 이벤트가 발생하더라도 앱의 상태에 따라 다르게 처리하고 싶을 수도 있고, 무시하고 싶을 수도 있으며, 앱의 상태와 관계없이 항상 처리하고 싶을 수도 있다. 특히 외부 이벤트가 리덕스의 액션과 연결되는 경우 리덕스의 미들웨어를 통해 액션이 Dispatch 됐다는 것을 알아야 한다. 때문에 단순한 리덕스 스토어의 API만으로는 모든 서비스 로직을 처리하기가 상당히 까다롭다.

외부 이벤트 처리와 관련한 것 중 가장 자주 볼 수 있는 경우가 바로 소켓과의 연결이다. 리덕스 생태계에서는 소켓과 리덕스가 잘 어울릴 수 있도록 하는 많은 라이브러리가 있지만, 이 글에서는 Redux-Saga(이하 리덕스 사가 또는 사가)를 활용한 소켓 이벤트 처리를 알아보려 한다.

미리 알아야 할 것

  • 리덕스 - 리덕스 사가는 리덕스의 미들웨어다. 이전에 작성한 Redux 분석하기 글도 시간이 된다면 참고하자.
  • 리덕스 사가 - 이 글은 사가의 API를 활용하는 글이기 때문에 사가에 대한 기본적인 이해가 필요하다. 번역 글인 "redux-saga로 비동기처리와 분투하다."도 시간이 된다면 참고해보자. (아직은 리덕스-사가에 대한 한글로 작성된 글이 많이 없는 것 같다.)

소켓 이벤트 채널 만들기

리덕스 사가에서는 기본적으로 채널, 액션 채널, 이벤트 채널이라는 API를 제공한다. 소켓 이벤트를 연결하기 위해서는 이벤트 채널 API를 활용해볼 수 있다.

먼저 간단하게 소켓 이벤트 채널을 만들어주는 팩토리 함수를 만들어보자.

// createSocketChannel.js

import { eventChannel, buffers } from "redux-saga";
import socket from "../../../mySocket";

const defaultMatcher = () => true;

export function createSocketChannel(eventType, buffer, matcher) {
  return eventChannel(
    emit => {
      const emitter = message => emit(message);

      socket.on(eventType, emitter);
      return function unsubscribe() {
        socket.off(eventType, emitter);
      };
    },
    buffer || buffers.none(),
    matcher || defaultMatcher
  );
}

export function closeChannel(channel) {
  if (channel) {
    channel.close();
  }
}

이벤트 채널 API를 정말 간단하게 wrapping만 해도 소켓 채널 팩토리를 만들 수 있다. 그리고 이벤트 채널을 만들 때는 항상 unsubscribe 함수를 반환해야 하는 점은 잊지 말자.

buffermatcher는 이후 이어지는 내용에서 조금 더 자세히 알아볼 예정이니 우선은 무시하자.

소켓 채널 메시지 수신

소켓 채널 팩토리를 만들었으니 이제 소켓 메시지들을 수신해보자. 다음 간단한 코드로 소켓 메시지를 수신할 수 있다.

import { fork, take, call } from "redux-saga/effects";
import { createSocketChannel } from "./createSocketChannel";

function* onMessage(type) {
  const channel = yield call(createSocketChannel, type);

  while (true) {
    try {
      const message = yield take(channel);

      console.log(message);
    } catch (e) {
      alert(e.message);
    }
  }
}

export default function* rootSaga() {
  //...
  // "foo"와 "bar"라는 메시지 수신하기
  yield fork(onMessage, "foo");
  yield fork(onMessage, "bar");
}

리덕스 사가는 리덕스의 미들웨어이기 때문에 소켓 채널에서 메시지를 받아 액션을 다시 Dispatch 하거나, 다른 액션을 기다리거나, 에러 처리 혹은 또 다른 비동기 처리 등의 많은 작업을 손쉽게 처리할 수 있다.

소켓 채널 활용하기 - 특정 액션에서 메시지 수신

소켓 채널을 통해 메시지를 수신하는 방법까지 알아보았다. 이제 WAIT_FOO_MESSAGE_ONCE라는 액션이 발생하면 소켓에서 "foo" 타입의 메시지를 한 번만 수신해서 처리한다고 생각해보자.

import { takeEvery, take, call } from "redux-saga/effects";
import { createSocketChannel, closeChannel } from "./createSocketChannel";
import { WAIT_FOO_MESSAGE_ONCE } from "../../../action/message";

function* waitFooOnce() {
  let channel;

  try {
    channel = yield call(createSocketChannel, "foo");

    const message = yield take(channel);

    console.log(message);
  } catch (e) {
    alert(e.message);
  } finally {
    closeChannel(channel);
  }
}

export default function* rootSaga() {
  //...
  yield takeEvery(WAIT_FOO_MESSAGE_ONCE, waitFooOnce);
}

takeEvery를 통해 WAIT_FOO_MESSAGE_ONCE라는 액션에 waitFooOnce라는 Worker를 등록했다. 이제 WAIT_FOO_MESSAGE_ONCE 액션이 발생할 때마다 "foo" 메시지를 기다려서 처리할 수 있다!

만약 메시지를 대기하는 작업이 끝나지 않은 상태에서 또 같은 액션이 발생했을 때, 전부 무시하고 한번만 메시지를 수신하고 싶다면 takeEvery 대신 takeLatest를 활용할 수 있다.

소켓 채널 활용하기 - 타임아웃

소켓 채널에 대해 이해했다면 이제 timeout 로직을 구현하기도 매우 쉽다. 10초 동안 메시지를 기다리고, 메시지가 오지 않았다면 alert을 나타낸 후 소켓 메시지를 대기하는 작업을 종료시켜 보자.

waitFooOnce를 다음과 같이 변경할 수 있다.

import {delay} from 'redux-saga';
import {takeEvery, take, race, call} from 'redux-saga/effects';
import {createSocketChannel, closeChannel} from './createSocketChannel';
import {WAIT_FOO_MESSAGE_ONCE} from '../../../action/message';

function* waitFooOnce() {
  let channel;

  try {
    channel = yield call(createSocketChannel, 'foo');

    const {timeout, message} = yield race({
      timeout: delay(10000);
      message: take(channel);
    });

    if (timeout) {
      alert('timeout!!');
    } else {
      console.log(message);
    }
  } catch (e) {
    alert(e.message);
  } finally {
    closeChannel(channel);
  }
};

export default function* rootSaga() {
  //...
  yield takeEvery(WAIT_FOO_MESSAGE_ONCE, waitFooOnce);
}

race effect를 통해 타임아웃을 간단히 구현할 수 있었다. 만약 리덕스 사가를 사용하지 않고, 특정 액션이 발생했을 때 어떤 소켓 메시지를 타임아웃 설정과 함께 기다리도록 구현한다고 생각해보자. 상상하는 것만으로도 머리가 너무 아프다.

버퍼 활용하기

특정 액션이 발생하고, 그 액션에 알맞는 요청을 ajax로 처리 한 후 소켓 메시지를 기다린다고 생각해보자.

액션 Dispatch => ajax 처리 => 소켓 메시지 대기

문제는 ajax 요청과 소켓 메시지는 둘 다 비동기라서 순서가 꼬일 수 있다. 조금 더 세분화해서 보면 다음과 같은 두 가지 흐름이 발생할 수 있다.

  1. 액션 Dispatch => ajax 요청 => ajax 응답 => 소켓 메시지
  2. 액션 Dispatch => ajax 요청 => 소켓 메시지 => ajax 응답

만약 ajax의 응답과 소켓 메시지가 별개의 문제라면 상관없겠지만, 소켓 메시지가 이전에 호출한 ajax 응답의 결과와 연관되는 메시지라면 문제가 발생한다. 이런 경우를 대비하기 위해 버퍼를 활용할 수 있다.

다음 코드는 문제가 발생할 수 있는 경우이다.

channel = yield call(createSocketChannel, 'foo');

const result = yield call(api.requestFoo)

//... result에 따른 어떤 처리

const {timeout, message} = race({
  timeout: delay(10000),
  message: take(channel);
});

//.. message 따른 어떤 처리

ajax 응답이 도착하기 전에 소켓 메시지가 먼저 도착한다면 위 작업은 결국 메시지 타임아웃이 발생한다. 소켓 서버 입장에서는 분명히 메시지를 보냈는데도 불구하고 메시지가 도착하지 않은 것처럼 보이기 때문에 억울할 수 있다.

이제 버퍼를 활용해보자. 아까 팩토리를 작성할 때 버퍼를 전달받도록 했다는 점을 기억하자. 이벤트 채널의 버퍼는 buffers라는 API를 통해 만들 수 있다.

channel = yield call(createSocketChannel, 'foo', buffers.sliding(1));

const result = yield call(api.requestFoo);

//... result에 따른 어떤 처리

const {timeout, message} = race({
  timeout: delay(10000),
  message: take(channel);
});

//.. message 따른 어떤 처리

버퍼의 사이즈를 1로 설정하고, overflow가 발생하면 이전 메시지를 버리도록 sliding API(이 외에도 fixed, expanding, dropping이 있다.)를 사용했다. ajax의 응답이 도착하지 전에 메시지가 먼저 도착해도 버퍼에 저장되고, 이후 take로 가져올 수 있으니 메시지를 놓칠 걱정을 하지 않아도 된다!

Matcher 활용하기

Matcher 사용은 버퍼보다 훨씬 간단하다. 이벤트가 수신될 때 실제로 채널에 전달되어야 하는 이벤트만 거르는 필터 역할을 한다.

메시지에는 요청자 아이디가 있고, 애플리케이션은 메시지의 요청자가 자신일 때만 어떤 다른 처리를 한다고 생각해보자. 다음과 같이 Matcher를 작성할 수 있다.

const matcher = message => message.headers.requester === MY_ID;

function* onMyMessage() {
  const channel = yield call(createSocketChannel, "foo", null, matcher);

  while (true) {
    try {
      // 여기에서 받은 메시지는 전부 자신이 직접 요청한 메시지다.
      const message = yield take(channel);

      // ... 메시지 처리
    } catch (e) {
      alert(e.message);
    }
  }
}

// ...

소켓 채널에서 Matcher를 통해 필터링을 수행할 수 있기 때문에 소켓 메시지 리스너에서 별도의 분기 처리 로직들을 제거할 수 있다. 즉, 소켓 채널을 1개만 만들어서 리스너 내부에서 각 메시지에 따른 분기처리를 하기보다는 Mathcer를 활용해서 여러 개의 채널에서 각각의 서비스 로직을 처리하도록 하자.

빠르게 수신되는 메시지들에 대한 일괄 처리

리덕스-사가를 통해 소켓 메시지에 대한 처리를 손쉽게 할 수 있지만 그래도 문제는 여전히 발생한다. 특히 소켓 메시지가 빠르게 수신되고 그 메시지에 따른 어떤 액션을 dispatch 하는 경우 렌더링에 병목 현상이 발생한다. (관련 이슈 - "Huge performance issue when dispatching hundreds of actions")

간단하게 앱을 리액트로 렌더링하는 다음과 같은 경우를 생각해보자.

  1. 리스트에 아이템이 1000개가 있다.
  2. 사용자가 리스트 아이템 1000개 삭제 요청을 한다.
  3. 서버는 아이템 삭제를 비동기로 처리한다.
  4. 각 아이템 삭제에 대한 소켓 메시지가 수신된다.
  5. 아이템 삭제 메시지가 수신될 때마다 리스트 아이템 삭제 액션을 Dispatch 한다.
  6. 리액트가 아이템이 제거된 리스트의 렌더링을 완료하기 전에, 다음 아이템 삭제 메시지가 도착한다.
  7. 이 작업이 약 1000번 반복된다.

이런 경우, 성능 저하를 피할 수 없다. 간단하게 한 번 더 생각해보자.

  1. 첫 번째 아이템의 삭제 메시지가 도착했다.
    -> 이전 상태 리스트와 다음 상태 리스트를 비교한다.
    --> 리스트 아이템 엘리먼트 1000개를 비교한다.
    ---> 삭제된 아이템을 빼는 DOM 업데이트를 수행한다.
  2. 두 번째 아이템의 삭제 메시지가 도착했다.
    -> 이전 상태 리스트와 다음 상태 리스트를 비교한다.
    --> 리스트 아이템 엘리먼트 999개를 비교한다.
    ---> 삭제된 아이템을 빼는 DOM 업데이트를 수행한다.
  3. ...

아이템 1000개를 삭제하면, 일단 DOM의 렌더링 1000번은 무조건 요구한다. 그리고 리액트 리스트 컴포넌트에서만 각 메시지에 따라 각 아이템 엘리먼트를 비교하는 Reconciliation 작업도 있다. 물론 리액트나 브라우저가 내부적으로 어떤 일괄 처리 작업을 수행할 수도 있지만, Operation 자체가 1000번의 DOM 렌더링과 약 50000번 이상의 Reconciliation을 요구하는 것은 변하지 않는다(+ 리덕스의 모든 Change 리스너도 1000번씩 수행된다). 결국 1000개의 메시지가 2~3초 이내에 전부 도착한다면 성능 문제가 발생한다.

아래 코드를 보자. 평상시에는 별문제가 없겠지만, 많은 작업을 처리할 때는 큰 문제가 된다.

// Action creator
export function deleteItems(...ids) {
  return {
    type: DELETE_ITEMS,
    ids
  };
}
// Saga
const channel = yield call(createSocketChannel, 'deleteItem');

while (true) {
  const message = yield take(channel);

  yield put(deleteItems(message.itemId));
}

성능 문제를 해결하기 위해 먼저 생각해볼 방법은 Throttle이나 Debounce지만 이런 삭제와 같은 작업에는 적합하지 않다. 어쨌든 결과적으로는 1,000개의 아이템 삭제를 모두 처리하고 사용자에게 보여주어야 하는데, Throttle이나 Debounce는 의도적으로 메시지를 버리는 작업이라서 그 결과를 정확히 보여줄 수가 없다.

따라서 이런 경우에 약 100~200ms(앱마다 대기해야 할 시간은 다르므로 적정 시간은 꼭 측정해야 한다.) 정도는 메시지를 모았다가 한 번에 일괄처리하는 방법이 필요하다.

이제 버퍼를 활용해서 메시지를 모아보자.

// Saga

const channel = yield call(createSocketChannel,
  'deleteItem',
   buffers.expanding(50) // 임의로 50을 지정했다. 각 앱에 맞게 설정한다.
);

while (true) {
  const messages = yield flush(channel);
  const ids = messages.map(message => message.itemId);

  if (ids.length) {
    yield put(deleteItems(...ids));
  }

  yield delay(200); // 임의로 지정한 대기 시간이다. 각 앱에 맞게 설정한다.
}

이렇게 버퍼를 활용하면 약 200ms 동안 메시지를 모아서 deleteItems를 일괄적으로 처리할 수 있다. 또한, Throttle이나 Debounce와 다르게 메시지를 의도적으로 버리지 않는다.

그리고 또 중요한 점은 바로 if (ids.length) 구문이다. 만약 이 구문이 없으면 메시지를 받았든 받지 않았든 200ms마다 항상 dispatch 작업을 수행하여 개발자에게 엄청난 혼란을 줄 수 있으므로 주의하도록 하자.

마치며

리덕스를 사용하는 애플리케이션에서 소켓과 같은 외부 이벤트들을 함께 처리하는 것은 꽤 어렵다. 앞서 설명한 리덕스 생태계의 라이브러리들이나 리덕스 사가를 활용하면 조금 더 쉽게 외부 이벤트들을 처리할 수 있을 것이다.

이번에 리덕스 사가와 내부의 이벤트 채널, 버퍼, Matcher API를 활용해 소켓 이벤트를 간단히 처리할 수 있었다. 하지만, 그 예제나 설명이 생각보다 많이 없어서 처음에는 많은 어려움을 겪었다(리덕스 사가의 공식 문서는 꽤 잘 작성되어 있지만, 이벤트 채널에 대한 내용은 많이 없다). 이 글을 통해 리덕스 사가에서의 소켓 이벤트 처리에 조금이나마 도움이 되었길 바란다.

이민규2017.05.19
Back to list