이전 게시글에서 Kafka 설치부터 Procuder와 Consumer를 이용한 발행/구독 과정까지 정리하였습니다. 이번 글에서는 spring-boot를 통해 간단한 Producer와 Consumer를 만들고 이벤트를 발행/구독하는 과정을 살펴보겠습니다.
1. 프로젝트 준비
먼저 스프링 부트 프로젝트를 준비합니다. 빠른 프로젝트 생성을 위해 spring initializr를 사용하는 방법도 있습니다. 프로젝트를 준비했다면 Kafka 사용을 위해 pom.xml에 dependency를 추가해줍니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
스프링 카프카를 사용하기 위해 application.yml에 아래의 설정을 추가해줍니다. {kafka-server-ip}는 kafka가 설치된 서버의 아이피로 수정해주세요.
server
port: 9000
spring:
kafka:
consumer:
bootstrap-servers: {kafka-server-ip}:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: {kafka-server-ip}:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
2. Producer 생성
Producer에 kafkaTemplate을 주입해줍니다. kafkaTemplate.send(topic, data) 메서드를 통해 Producer는 이벤트를 발급할 수 있습니다. 파라미터로는 토픽 명과 이벤트(data)를 발행합니다. 토픽명은 test-events 로 만들어주겠습니다.
@Service
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "test-events";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
logger.info("#### -> Producing message -> {}", message);
this.kafkaTemplate.send(TOPIC, message);
}
}
3. Consumer 생성
Consumer는 kafka 브로커로부터 이벤트를 요청하여 가져옵니다. @KafkaListener 애노테이션을 통해 구독하려는 이벤트의 토픽과 그룹을 간편하게 설정할 수 있습니다. 애노테이션에 topics값에 producer에서 발행할 토픽과 같이 test-events를 넣어줍시다. 해당 토픽에 대한 이벤트 메시지는 consume함수의 파라미터로 전달받아올 수 있는데요, 실제 kafka를 이용하여 서비스를 구축한다면 consume 함수 내에 비즈니스 로직이 구현됩니다.
@Service
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(topics = "test-events", groupId = "group_id")
public void consume(String message) {
logger.info("#### -> Consumed message -> {}", message);
}
}
4. Controller 생성 및 테스트
Producer와 Consumer를 모두 만들어주었는데요, 이제 테스트를 위한 Controller를 만들어 주겠습니다.
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final Producer producer;
@Autowired
public KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping("/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
this.producer.sendMessage(message);
}
}
이제 애플리케이션을 실행하고 KafkaController에 POST 요청을 보냅니다.
애플리케이션 실행 전 Zookeeper와 Kafka broker 가 동작중인지 체크해주세요.
$ curl -X POST -F 'message=test' http://localhost:9000/kafka/publish
스프링 부트 로그에서 Producer가 이벤트를 브로커에 잘 전송했는지, Consumer가 이벤트를 브로커로부터 잘 받아왔는지 확인합니다. 잘 동작한다면 아래와 같은 로그를 확인할 수 있습니다.
2020-09-05 19:47:53.953 INFO 18208 --- [nio-9000-exec-1] com.demo.engine.Producer : #### -> Producing message -> test
2020-09-05 19:47:54.025 INFO 18208 --- [ntainer#0-0-C-1] com.demo.engine.Consumer : #### -> Consumed message -> test
카프카 설치부터 스프링부트를 이용한 발행/구독 모델을 만들고 테스트해보았습니다. 궁금하거나 부족한 내용이 있다면 댓글로 남겨주세요~
감사합니다 :)