본문 바로가기
MSA (Micro Service Architecture)/Simple MSA Prototyping

2. 실시간 데이터 동기화

by kellis 2020. 10. 16.

 

AS-IS TO-BE

페이스북에서 친구들에 의해 피드가 추가된다면, 'N개의 새로운 피드가 있습니다. 끌어서 당겨오기'라는 문구를 보게 됩니다. 이처럼 정적인 웹페이지에서 데이터가 실시간으로 변경되었다는 것을 알리거나, 자동으로 업데이트되는 기능이 필요한 경우가 있습니다.

MSA Prototype에는 유저의 정보가 업데이트 될 때, 웹페이지에서 페이지의 이동 없이 유저의 정보가 변경되는 기능을 추가하고자 했습니다. 이를 위해 크게 3가지의 작업을 했습니다.

  1. UserService의 update가 성공적인 결과를 반환했을 때, RabbitMQ의 exchange에 변경된 User 정보를 push 합니다.
  2. NotiService는 exchange가 보내는 데이터를 보관할 임시 Queue를 만들고, Queue에 데이터가 쌓이면 Websocket으로 그 데이터를 보냅니다.
  3. Angular App은 Websocket을 열어 NotiService가 Websocket으로 보내는 데이터를 실시간으로 받습니다.

 

1. RabbitMQ

UserService의 update가 성공적인 결과를 반환했을 때, RqbbitMQ의 exchange에 변경된 User 정보를 push 합니다.
RabbitMQ가 제공하는 exchange의 종류는 여러 가지가 있는데, 여기서는 Topic exchange를 썼습니다. 지금은 Queue가 한 개이므로 Fanout exchange 혹은 Direct exchange를 써도 무관합니다.

1) dependency 추가

<dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
</dependency>

 

2) RabbitMQ Config 등록

package kr.sys4u.demo.userservice.config;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
    @Bean
    public TopicExchange userChangedExchange() {
        return new TopicExchange("notification");
    }
     
    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(rabbitMqJsonMessageConverter());
        return rabbitTemplate;
    }
      
    @Bean
    public Jackson2JsonMessageConverter rabbitMqJsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

MessageConverter에 대한 세팅을 해주는 이유는 User타입 json으로 변환하여 전달하기 위함입니다.

 

3) Send to Exchange

this.template.convertAndSend(exchange.getName(), UPDATE_ROUTING_KEY, savedUser);

Config에서 빈으로 등록했던 rabbitTemplate의 convertAndSend를 이용해 특정 exchange에 원하는 데이터를 보낼 수 있습니다. 두 번째 매개변수로 RountingKey를 입력받는데, 이는 TopicExchange를 사용했기 때문입니다. 이와 관련된 내용은 RabbitMQ Guide를 확인하시기 바랍니다. 

 


2. NotiService+Websocket

NotiService는 크게 두 가지 작업을 합니다.
  1. exchange가 보내는 데이터를 보관할 임시 Queue를 만든다.
  2. Queue에 대한 subscriber를 만들어, 데이터가 오는 즉시 Websocket으로 그 데이터를 전송한다.

 

1) dependency 추가

<dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
</dependency>
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

 

2) RabbitMQ Config 등록

package kr.sys4u.demo.notificationservice.config;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
    @Bean
    public TopicExchange userChangedExchange() {
        return new TopicExchange("notification");
    }
    @Bean
    public Queue tempQueueForUser() {
        return new AnonymousQueue();
    }
    @Bean
    public Binding bindingForUpdate(TopicExchange topic, Queue tempQueueForUser) {
        return BindingBuilder.bind(tempQueueForUser).to(topic).with("user.changed");
    }
    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(rabbitMqJsonMessageConverter());
        return rabbitTemplate;
    }
      
    @Bean
    public Jackson2JsonMessageConverter rabbitMqJsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
  • 24라인의 .with에 대한 매개변수는 routingKey로, UserService에서 exchange로 전송할 때의 rountingKey와 일치해야 합니다.  Exchange의 데이터를 받는 쪽에서도 MessageConverter를 이용해야 Json데이터가 User타입으로 변환될 수 있습니다.

 

3) Websocket Config 등록

package kr.sys4u.demo.notificationservice.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").setAllowedOrigins("*");
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app").enableSimpleBroker("/notification");
    }
}

 

4) Send Queue data to Websocket 

package kr.sys4u.demo.notificationservice.app.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import kr.sys4u.demo.common.dto.User;
@Component
public class NotificationReceiver {
    private final SimpMessagingTemplate template;
    @Autowired
    public NotificationReceiver(SimpMessagingTemplate template) {
        this.template = template;
    }
    @RabbitListener(queues = "#{tempQueueForUser.name}")
    public void receive(User user) throws InterruptedException, JsonProcessingException {
        user.setPasswd(null);
        template.convertAndSend("/notification/user/" + user.getUserId(), user);
    }
}

3. Angular stomp.js

Angular에서 Websocket포트를 열어 특정 topic에 대해 구독합니다. Websocket 포트를 열어주고 연결하는 것을 도와주는 라이브러리 ng2-stmpjs를 사용했습니다.
Websocket의 topic과 RabbitMQ의 Topic exchange는 다르니 참고하십시오.

 

1) npm install stomp

npm install @stomp/ng2-stompjs --save

 

2) Stomp.js 설정

Stomp.js를 이용했을 때의 장점은 애플리케이션에서 구독할 Websocket URL등의 설정 파일만 제공한다면 그 외의 작업은 모두 StompService에게 위임한다는 것입니다. 따라서 어플리케이션 로딩시에 StompService에게 설정파일만 제공해주면 됩니다. 

import { StompConfig, StompService } from '@stomp/ng2-stompjs';
 
export class WebSocketConfig {
    public static uri = 'ws://noti.msa.com:8080/ws';
    public static USER_TOPIC = '/notification/user/';
}
 
 
export const stompConfig: StompConfig = {
    url: WebSocketConfig.uri,
    headers: {
    },
    heartbeat_in: 0,
    heartbeat_out: 20000,
    reconnect_delay: 5000,
    debug: false
};

 

위의 설정 파일을 애플리케이션 로딩 시에 StompService에게 전달해줘야 하므로 app.module.ts의 providers에 다음을 추가합니다. 

providers: [
  StompService,
  {
    provide: StompConfig,
    useValue: stompConfig
  }
]

 

3) subscribe

구독하는 방법은 간단합니다. 

this.stompService.subscribe(topic).subscribe(function (message) {
     console.log(JSON.parse(message.body));
});

subscribe 메서드가 두 번 호출되는데, 이름만 같을 뿐 기능은 다릅니다. 첫 번째 subscribe의 매개변수에 구독하려는 topic을 넣어주고, 두 번째 subscribe의 매개변수에 callback function을 넣어줍니다.

 

4) unsubscribe

그러나 그 구독을 취소하기 위해서는 Subscription에 대한 unsubscribe를 해야 하므로 subscribe에 대한 결과를 미리 저장해야 합니다. 

const subscription: Subscription = this.stompService.subscribe(topic).subscribe(function (message) {
     console.log(JSON.parse(message.body));
});
subscription.unsubscribe();

 

Angular app의 비즈니스 로직에 stompService를 적용하면서 두 가지 이슈가 있었습니다.

  1. 화면을 새로고침 하면 구독하고 있던 토픽에 대해서 자동으로 Unsubscribe가 이루어집니다. 따라서 구독하는 목록들을 Localstorage에 저장해 두고, 애플리케이션이 initialize 될 때 그 목록들을 다시 Subscribe 해야 합니다. 이를 위해 Store와 Localstorage를 이용했습니다.
  2. Unsubscribe 하기 위해서는 Subscribe 한 결과 즉 Subscription이 필요하므로 전역적으로 저장해야 합니다. 이것을 WebsocketService에 subscriptions에 key-value 형식으로 저장하기로 결정했습니다.

앱이 로딩 시 행위, 로그인 시 행위, 로그아웃 시 행위에 대해 표로 정리했습니다.


4. Alternatives architecture

이 애플리케이션에서는 큐가 1개이지만, 큐가 필요하다면 NotiService를 수정해야 한다는 불편함이 있습니다. 앞으로 그 구조를 변경할지는 모르겠지만 대안이 될 수 있는 아키텍처에 대해 고민해봤습니다.

 

 

1) NotiService를 큐 단위로 분리합니다.

NotiService가 분리되면 Client가 알아야 하는 도메인이 늘어나게 되므로 NotiService 앞에 Nginx를 두어 Api gateway로 동작할 수 있도록 만들어주어야 합니다. 

그러나 만약 Notiservice들이 많아지면 exchange를 거치게 되는 데이터가 많아지므로 exchange에 과부하가 걸릴 수도 있습니다.

 

2) exchange를 큐 단위로 분리합니다. 

그러면 exchange에 들어오는 데이터를 토픽으로 분리할 것 없이 바로 큐에 전송하면 되므로 fanout exchange를 써도 괜찮고, direct exchange를 써도 무방할 것입니다. 

 

 

댓글