Link
Today
Total
09-23 12:18
Archives
관리 메뉴

초보개발자 긍.응.성

카프카 시작하기 - (2) (Kafka QuickStart - (2)) 본문

Kafka

카프카 시작하기 - (2) (Kafka QuickStart - (2))

긍.응.성 2020. 9. 5. 20:03
반응형

이전 게시글에서 Kafka 설치부터 Procuder와 Consumer를 이용한 발행/구독 과정까지 정리하였습니다. 이번 글에서는 spring-boot를 통해 간단한 Producer와 Consumer를 만들고 이벤트를 발행/구독하는 과정을 살펴보겠습니다.

 

카프카 시작하기 - (1) (Kafka QuickStart - (1))

Kafka 설치부터 간단한 spring-boot 애플리케이션과 연결하여 동작시키는 과정을 정리한 글입니다. Apache Kafka QuickStart 페이지와 CONFLUNET 예제를 참고하였습니다. 1. Kafka 다운로드 Kafka를 다운로드합니.

ckddn9496.tistory.com

 

1. 프로젝트 준비

먼저 스프링 부트 프로젝트를 준비합니다. 빠른 프로젝트 생성을 위해 spring initializr를 사용하는 방법도 있습니다. 프로젝트를 준비했다면 Kafka 사용을 위해 pom.xml에 dependency를 추가해줍니다. 

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

 

스프링 카프카를 사용하기 위해 application.yml에 아래의 설정을 추가해줍니다. {kafka-server-ip}는 kafka가 설치된 서버의 아이피로 수정해주세요.

server
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers: {kafka-server-ip}:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: {kafka-server-ip}:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

2. Producer 생성

Producer에 kafkaTemplate을 주입해줍니다. kafkaTemplate.send(topic, data) 메서드를 통해 Producer는 이벤트를 발급할 수 있습니다. 파라미터로는 토픽 명과 이벤트(data)를 발행합니다. 토픽명은 test-events 로 만들어주겠습니다. 

@Service
public class Producer {
	private static final Logger logger = LoggerFactory.getLogger(Producer.class);
	private static final String TOPIC = "test-events";

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	public void sendMessage(String message) {
		logger.info("#### -> Producing message -> {}", message);
		this.kafkaTemplate.send(TOPIC, message);
	}
}

 

3. Consumer 생성

Consumer는 kafka 브로커로부터 이벤트를 요청하여 가져옵니다. @KafkaListener 애노테이션을 통해 구독하려는 이벤트의 토픽과 그룹을 간편하게 설정할 수 있습니다. 애노테이션에 topics값에 producer에서 발행할 토픽과 같이 test-events를 넣어줍시다. 해당 토픽에 대한 이벤트 메시지는 consume함수의 파라미터로 전달받아올 수 있는데요, 실제 kafka를 이용하여 서비스를 구축한다면 consume 함수 내에 비즈니스 로직이 구현됩니다. 

@Service
public class Consumer {
	private final Logger logger = LoggerFactory.getLogger(Consumer.class);

	@KafkaListener(topics = "test-events", groupId = "group_id")
	public void consume(String message) {
		logger.info("#### -> Consumed message -> {}", message);
	}
}

 

4. Controller 생성 및 테스트

Producer와 Consumer를 모두 만들어주었는데요, 이제 테스트를 위한 Controller를 만들어 주겠습니다.

@RestController
@RequestMapping("/kafka")
public class KafkaController {

	private final Producer producer;

	@Autowired
	public KafkaController(Producer producer) {
		this.producer = producer;
	}

	@PostMapping("/publish")
	public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
		this.producer.sendMessage(message);
	}
}

 

이제 애플리케이션을 실행하고 KafkaController에 POST 요청을 보냅니다.

애플리케이션 실행 전 Zookeeper와 Kafka broker 가 동작중인지 체크해주세요. 
$ curl -X POST -F 'message=test' http://localhost:9000/kafka/publish

스프링 부트 로그에서 Producer가 이벤트를 브로커에 잘 전송했는지, Consumer가 이벤트를 브로커로부터 잘 받아왔는지 확인합니다. 잘 동작한다면 아래와 같은 로그를 확인할 수 있습니다.

2020-09-05 19:47:53.953  INFO 18208 --- [nio-9000-exec-1] com.demo.engine.Producer                 : #### -> Producing message -> test
2020-09-05 19:47:54.025  INFO 18208 --- [ntainer#0-0-C-1] com.demo.engine.Consumer                 : #### -> Consumed message -> test

카프카 설치부터 스프링부트를 이용한 발행/구독 모델을 만들고 테스트해보았습니다. 궁금하거나 부족한 내용이 있다면 댓글로 남겨주세요~

감사합니다 :)

※ 참고 링크: www.confluent.io/blog/apache-kafka-spring-boot-application/?utm_medium=sem&utm_source=google&utm_campaign=ch.sem_br.nonbrand_tp.prs_tgt.kafka_mt.xct_rgn.apac_lng.eng_dv.all&utm_term=spring%20kafka&creative=&device=c&placement=&gclid=Cj0KCQjwy8f6BRC7ARIsAPIXOjhx6ss21Yr4ilmaYMl_YjrCS_EaYYSR2Spj2P0IRy-qmVawsgoW1PoaAoUgEALw_wcB

반응형

'Kafka' 카테고리의 다른 글

카프카 시작하기 - (1) (Kafka QuickStart - (1))  (2) 2020.09.05
Comments