본문 바로가기
Delvelopment/Redis

프로모션을 대비한 대기열 시스템 구성하기 (Redis, WebSocket, Spring)

by 제제킴 2022. 8. 10.
반응형

회사내 기술블로그에 작성했던 글인데.. 기술블로그 오픈에 대한 내용이 홀딩되면서 1년이 지난 시점에 재작성 해봣다.

BackEnd 위주 구성에 대한 내용들이니 참고하길 바랍니다~!

 

✔개요

대기열 시스템의 탄생 배경

이번에 회사에서 새로운 상품을 오픈하기 위해 대대적인 마케팅?을 진행한다는 것을 전달받아 대기열 시스템을 설계, 개발을 진행했습니다. 


왜? 우리는 대기열 시스템이 필요했을까?

현재 대고객 서비스를 위한 개발 및 운영하고 있으며, 상품에 대한 주요정보 등을 코어영역와의 통신을 통해 처리하고 있다. (우리는 고객을 상대하는 채널이다.)
우리가 도입하려는 대기열 시스템은 고객의 동접을 대응하지 못할때 FIFO(선입선출) 방식으로 순차적으로 트래픽을 처리하기 위한 방안입니다.
레거시 시스템의 경우에는 On-Premise 환경으로 구성이 되어있어 트래픽이 몰릴 시 서버 확장 대응에 어려움이 있다. (오토 스케일링이 불가능) 신규 시스템의 경우에는 (클라우드 환경) 트래픽이 몰릴 시 서버 확장 대응에 용이 하다

결국에 레거시 시스템에서 병목이 발생하고... 모든 사이트는 마비가 된다. 
우리는 장애전파(Circuit Breaker)를 막기 위해 대기열 시스템을 만들자!

 


✔ 설계시 생각한 방안

  • 1안
    • MQ(Kafka)를 활용한 대기열 시스템.
    • 장점
      • 미들웨어를 활용하므로써 정확한 MQ 시스템 구성
      • 속도 및 안정성 최고
      • 구축 경험이 있어 러닝커브 적음
    • 단점
      • 기존에 도입되지 않아서 구성 시간이 필요.
      • 단발성으로 사용하기에 비용적 측면으로 비효율적.
  • 2안
    • Redis를 활용한 대기열 시스템
    • 장점
      • List, Sorted Set와 같은 자료형을 사용하면 MQ 구현가능 할 듯
      • Cache의 장점 부각, 속도 최고.
      • 러닝커브 적음
    • 단점
      • Redis가 죽으면 끝이다.

 ✅ 우리는 2안을 선택했고 시간적인 여유가 없었고, 현명한 선택이라 생각한다.

 


시스템구성

우리는 k8s 기반의 MSA 시스템 환경을 맞추기 위해, Spring Boot 기반의 Spring WebFlux를 도입하여 사용하였다.
Spring WebFlux는 Srping MVC와는 달리 Servlet과는 관계없이 만들어졌으며, WebFlux에서의 웹서버는 기본 설정은 Netty기반이지만 우리는 Spring Boot 에서 공식 지원 내장하는 (Lightweight, WebFlux에 적합, WebSocket 지원) Undertow를 적용했다.
WebFlux를 기반으로 논블록킹(Non-blocking) 비동기(Async) 프로그래밍을 도입했으며 Redis를 활용하였는데 reactive에 맞추어 제공되는 redis-reative 라이브러리를 사용햇다.

Fornt end와의 통신은 WebSocket과 REST API를 통해 통신을 진행하였으며,
Stomp(Simple Text Oriented Mssage Protocol)을 통해 WebSocket을 효율적으로 다루기 위한 프로토콜을 적용했다.
기존 WebSocket과는 다르게 Stomp는 Publisher/Subscriber 구조로 되어있다.
(즉, 구독한 사람들에게 메시지를 요청하면 메시지를 전달하는 형태)
대기열에서 작업열로 이동하는데 사용한 Spring-Batch도 도입하였다.

  • Java 11
  • Undertow
  • AWS WebServer
  • websocket (Stomp, SockJS)
  • Spring webflux
  • Redis-reactive
  • Junit5
  • Spring Batch

 


✔ 대기열의 구성은?

대기열의 구성은 오직 Redis와 Spring Boot 기반으로 구축하였다.
Redis 자료형 중 하나인 SortedSet을 사용했으며,
SortedSet은

  • Set과 Hash가 혼합된 타입이며,
  • 하나의 Key에 Score와 Value로 구성되어 있다.
  • Score기준으로 정렬이 가능하고
  • Queue 처럼 사용 가능하다

SortedSet 채택이유는 아래와 같다.

  • 각 명령어들의 효율적인 시간복잡도 O(1) ~ O(log(N)
    • 현재순위 조회 - Element의 현재위치 확인가능 ZRANK 명령어
      • 시간복잡도 - O(log(N))
    • 대기자 전체 크기 - Value의 전체 크기 ZCARD
      • 시간복잡도 - O(1)
    • 대기열에서 이동하는 Element Range 조회
      • 시간복잡도 - O(log(N))
    • 대기열의 Element 삭제
      • 시간복잡도 - O(log(N))
      • 자료형 List을 사용했을때는 Element 삭제시 시간복잡도 O(N)

 ✅ 우리가 대기열을 구성하기에 필요한 Queue를 적용할때 충분한 요건을 가진 자료형이다.

 


  • 대기열 로직
    1. 고객 대기열 페이지 진입.
    2. 대기열 Queue에 고객 Key값 Insert
    3. Batch에서 일정 시간마다 작업업열로 이동 가능한 Capability 확인
    4. 가능한 Capability 있으면 대기열에서 작업열로 이동.
  • 대기열 페이지에 진입한 고객
    1. 현재 위치 확인, 대기열 전체 사이즈 및 작업열 이동여부 조회.
    2. 작업열 이동 가능 여부 확인.
    3. 작업열에 유효한 Key값인지 확인
    4. 기존 발급된 이력이나, 만료된 Key값인지 확인.
    5. 동접 Ticket 발부.
    6. 세션 유지.
    7. 고객 이탈시 expire을 통해 세션 삭제.

 🔥 즉, 고객은 대기열 진입시 발급된 Key 값을 통해 작업열까지 진입하고, 티켓을 발급 받으면 티켓과 key값을 통해 고객 세션을 유지한다. 고객 세션이 만료되면 작업 가능한 Capability가 생긴다.


✔ WebSocket (Stomp)

WebSocket (Stomp) 사용 이유

  • 우리는 REST API를 통해서 통신을 진행할 수 있었지만 WebSocket을 사용했다.
  • 왜?
    • HTTP 통신의 Connection에 Cost를 줄이고자 하였고, WebSocket을 적용하였을때 Handshake를 최초에만 진행하여 전체적인 네트워크 Cost를 줄일 수 있다는걸 결론으로 도출했다.
    • WebSocket은 HTTP Protocol로 Connection을 초기에 맺고, ws(WebSocket),wss Protocol로 Upgrade한 후 서로에게 Heartbeat를 주기적으로 발생시켜 Connection을 유지하고 있는지 확인한다.
    • WebSocket(Stomp)은 브라우저에 대한 호환성 때문에도 채택을 진행했다.

 💡 즉, Cost를 절감해 고객의 유입폭이 더 증가 할 수 있다 라는 결과를 뽑았다. ( HTTP API 통신을 안한 것은 아니다. 필요에 따라 사용했다. )

WebSocket (Stomp) 연결

@Slf4j
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private final StompHandler stompHandler;

    public WebSocketConfig(StompHandler stompHandler) {
        this.stompHandler = stompHandler;
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.enableSimpleBroker("/topic");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/connect")
                .setAllowedOriginPatterns("*")
                .withSockJS()
                .setClientLibraryUrl("<https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.3.0/sockjs.min.js>");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
//        registration.interceptors(new HeaderCheckInterceptor());
        registration.interceptors(stompHandler);
    }

}
  • registerStompEndpoints
    • Websocket Connection에 관련된 설정이다.
    • SockJS를 이용해 STOMP end-point를 설정해준다.
    • withSockJS()
      • 브라우저에서 Websocket을 지원하지 않을 경우 Fallback 옵션을 활성화 하는데 사용한다.
      Enable SockJS fallback options.
    • setAllowedOriginPatterns("*")
      • WebSocket에서 Cors 처리를 위한 허용 패턴.
      • 일정 버전부터 setAllowedOrigins 메서드가 사용되지 않아 setAllowedOriginPatterns을 사용함.
      Alternative to setAllowedOrigins(String...) that supports more flexible patterns for specifying the origins for which cross-origin requests are allowed from a browser. Please, refer to CorsConfiguration.setAllowedOriginPatterns(List) for format details and other considerations. By default this is not set. Since: 5.3.2
  • configureMessageBroker
    • 메시지브로커에 대한 Prefix 설정.
    • config.setApplicationDestinationPrefixes
      • Socket 통신시 End-Point 목적지의 Prefix 설정이다.
      • 즉, Client Side에서 Server 사이드로 보내는 Message
      Configure one or more prefixes to filter destinations targeting application annotated methods. For example destinations prefixed with "/app" may be processed by annotated methods while other destinations may target the message broker (e.g. "/topic", "/queue"). When messages are processed, the matching prefix is removed from the destination in order to form the lookup path. This means annotations should not contain the destination prefix. Prefixes that do not have a trailing slash will have one automatically appended.
    • config.enableSimpleBroker
      • Subscriber에게 메시지를 보낼때의 목적지의 Prefix 설정이다.
      • 즉, Server Side에서 Client Side로 보내는 Message
      Configure the prefix used to identify user destinations. User destinations provide the ability for a user to subscribe to queue names unique to their session as well as for others to send messages to those unique, user-specific queues. For example when a user attempts to subscribe to "/user/queue/position-updates", the destination may be translated to "/queue/position-updatesi9oqdfzo" yielding a unique queue name that does not collide with any other user attempting to do the same. Subsequently when messages are sent to "/user/{username}/queue/position-updates", the destination is translated to "/queue/position-updatesi9oqdfzo". The default prefix used to identify such destinations is "/user/".
  • configureClientInboundChannel
    • Inbound 메시지에 대한 Intercepter처리를 할 수 있다.
    • JWT 인증과 같은 인증 로직에 주로 이용하고 있으며, @ChannelInterceptor 어노테이션을 이용하니 필요하면 참고해보길 바란다.

publish / subscribe

@Slf4j
@RestController
public class SocketHandler {

    @MessageMapping("/sendMessage/{key}")
    @SendTo("/topic/public/{key}")
    public String hello(String str){
        log.info("Check in hello -> " + str);
        return "your message -> " + str;
    }

}
  • @MessageMapping
    • Publish Target Url
    • Client to Server
    Destination-based mapping expressed by this annotation. For STOMP over WebSocket messages this is AntPathMatcher-style patterns matched against the STOMP destination of the message.
  • @SendTo
    • Subscribers 한테 Message를 전송한다
    • Server to Client
  • {key}
    • 우리는 고객마다 고유한 End-Point를 Key값을 통해 지정했다.

Junit 을 이용한 Client Test

@ActiveProfiles("local")
class SocketControllerTest {

    final String TARGET_URI = "<http://localhost:30001/connect>";
		final String SENDMESSAGE_URI = "/app/sendMessage/123456";
		WebSocketStompClient stompClient;

    private List<Transport> createTransportClient(){
        List<Transport> transports = new ArrayList<>();
        transports.add(new WebSocketTransport(new StandardWebSocketClient()));
        return transports;
    }

    @BeforeEach
    public void setup() throws InterruptedException{
        stompClient = new WebSocketStompClient(new SockJsClient(createTransportClient()));

        stompClient.setMessageConverter(new MappingJackson2MessageConverter());
    }

		@Test
    public void contextLoad() throws ExecutionException, InterruptedException, TimeoutException {

            WebSocketHttpHeaders httpHeaders = new WebSocketHttpHeaders();
            httpHeaders.add("jwt" , "test");
            StompHeaders stompHeaders = new StompHeaders();
            StompSession stompSession = stompClient.connect(TARGET_URI, httpHeaders, stompHeaders, new StompSessionHandlerAdapter() {
            }).get(1, TimeUnit.SECONDS);

            // Send
            stompSession.send(SENDMESSAGE_URI, "test");

		}
}
  • Junit의 WebSocketStompClient을 사용하여 Server side와의 WebSocket 연동 Test를 진행.
    • 소스참고.

 


✔ Redis

대기열에 큰 Point를 가지고 있는 Redis에 대한 셋팅이다.

Redis reative 설정

@Slf4j
@Configuration
@EnableCaching
public class RedisConfiguration {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private int port;

    
    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        return new LettuceConnectionFactory(host, port);
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        return redisTemplate;
    }

    @Bean
    public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
        RedisSerializer<String> serializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(String.class);
        RedisSerializationContext serializationContext = RedisSerializationContext
                .<String, String>newSerializationContext()
                .key(serializer)
                .value(jackson2JsonRedisSerializer)
                .hashKey(serializer)
                .hashValue(jackson2JsonRedisSerializer)
                .build();

        return new ReactiveRedisTemplate<>(connectionFactory, serializationContext);
    }

}
  • Redis를 Reative 방식으로 Template 설정.

주로 사용한 RedisUtils

@Slf4j
@Component
@AllArgsConstructor
public class RedisUtils {

    private final RedisTemplate<String ,Object> redisTemplate;

    private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;

		/**
     * @desc: Sorted Set 삭제.
     */
		public Mono<Long> delete(String key){
       return reactiveRedisTemplate.delete(key);
    }

		/**
     * @desc: Sorted Set 조회
     */
		public Mono<String> getValue(String key){
        return reactiveRedisTemplate.opsForValue().get(key);
    }

		/**
     * @desc: RedisTemplate에 SortedSet 초기화.
     */
    public ReactiveZSetOperations opsForZSet(){
        return reactiveRedisTemplate.opsForZSet();
    }

		/**
     * @desc: Sorted Set 자료형 사이즈
     */
    public Mono<Long> zCard(String str){
        ReactiveZSetOperations z = reactiveRedisTemplate.opsForZSet();
        return z.size(str);
    }

		/**
     * @desc: Sorted Set 자료형 start ~ end 까지 조회.
     */
    public Flux<String> zRange(String key, Long start, Long end){
        return opsForZSet().range(key, Range.closed(start, end));
    }

		/**
     * @desc: Sorted Set 자료형 Value의 현재위치 조회.
     */
    public Mono<Long> getzRank(String key, String value){
        return opsForZSet().rank(key, value);
    }
}

 


✔ 스트레스 테스트

고민한 설계와 개발을 끝냈으니 성능 테스트를 진행했다.
테스트 Tool 선택에 대한 고민이 컸다. nGrinder, JMeter들을 사용했으나 정확한 테스트가 진행되지 못했고, JUnit을 통해 부하 테스트를 진행했다.

시나리오

  • Connection → Subscribe → Send 와 같은 시나리오로 부하 테스트를 진행했다.

Stress Size

  • 약 15분간 총 12만건 진입
    • 분당 약 7천명
    • 초당 약 120명

결과

아래 수치를 보면 더 정확하게 확인이 가능하다.

  • Redis는 역시나 강력했고, 12만명의 동접자를 충분히 버틸 수 있는 수치가 측정되었다.
  • CPU와 RAM이 20% 까지는 쳤지만 이정도 수치를 3~4배로 가정해도 서비스가 버틸 수 있는 수치이다.

 


 

✔ 마무리

상품을 오픈하고 대기열 서비스를 오픈했었고, 큰 이슈 없이 프로모션들을 넘어갔다. 그리고 아쉽게 죽어있는 상태 ㅎㅎ
많은 사람들이 레거시에 대한 문제와 기술 도입에 대한 보수적인 의견을 가지고 있다고 생각한다.
틀린말은 아니다. 그걸 해결하고 극복하기 위해 내가 존재하며, 좋은 결과물을 도출해낼 것이다.

 

반응형

댓글