Logback KafkaAppender 사용하기
Logback은 로그 이벤트를 출력하는 방법으로 Appender를 이용합니다. 분산 서버 환경에서 ConsoleAppender나 FileAppender들을 이용한다면 해당 로그들은 서버 내에 쌓이게 됩니다. 이번 글에서는 애플리케이션에서 발생한 로그 이벤트를 아파치 카프카를 통하여 Publish 하는 방법에 대하여 다뤄보겠습니다.
Appender에 대하여 더 알아보고 싶다면 Logback - 4.Appenders (1). ConsoleAppender, FileAppender를 참고해주세요.
logback-kafka-appender
Log4j는 KafkaAppender를 직접 지원하지만 Logback의 경우 오픈소스로 제공되는 logback-kafka-appender를 사용합니다. logback-kafka-appender 사용을 위해서는 Logback version 1.2 이상이 필요합니다.
pom.xml - dependency 추가
pom.xml에 logback-kafka-appender와 logback-classic을 추가해줍니다. logback-kafka-appender는 kafka-client를 포함하고 있기 때문에, 이는 중복으로 추가하지 않아도 됩니다.
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>runtime</scope>
</dependency>
logback.xml - KafkaAppender 추가
<configuration>
<!-- Configurations... -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!-- kafkaAppender 추가 -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
<topic>logs</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<producerConfig>bootstrap.servers=10.106.156.132:9092</producerConfig>
<!-- Fallback 설정 -->
<appender-ref ref="STDOUT" />
</appender>
<logger name="kafkaLogger" level="INFO" additivity="false">
<appender-ref ref="kafkaAppender" />
</logger>
<root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
</configuration>
설정 파일을 살펴보겠습니다. 먼저 appender로 KafkaAppender를 지정합니다. encoding 방식은 PatternLayoutEncoder를 사용합니다.
KafkaAppender는 구성에는 topic 태그와 producerConfig 태그가 필요합니다. topic 태그는 발행할 토픽에 대한 값이 들어있어야 하며 producerConfig는 카프카 프로듀서의 properties를 설정을 위한 'key=value'값이 들어갑니다. 카프카 브로커의 bootstrap-server은 필수적이므로 이를 추가해줍니다. 나머지 설정들은 CONFLUENT Producer Configurations을 참고하시기 바랍니다.
또한 KafkaAppender는 appender-ref를 가질 수 있는데 이는 produce 과정이 실패했을 때 출력할 appender를 지정합니다.
kafkaAppender를 가진 kafkaLogger를 만들어줍니다. additivity="false"로 두어 kafkaLogger에서 출력하는 로그 이벤트는 상위 로거로 전파되지 않도록 합니다.
Kafka 준비
zookeeper와 kafka를 준비합니다. 설치 및 실행에 대해서는 카프카 시작하기 - (1) (Kafka QuickStart - (1))를 참고해주세요. 동작시켰다면, 이벤트에 대한 토픽을 생성합니다. 로그백 설정에 발행할 토픽 이름이 "logs" 였기 때문에 같은 이름으로 브로커에 토픽을 생성합니다.
# 'logs' 토픽을 생성
$ bin/kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092
Created topic logs.
# 'logs'에 대한 파티션 상태 확인
$ bin/kafka-topics.sh --describe --topic logs --bootstrap-server localhost:9092
Topic: logs PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: logs Partition: 0 Leader: 0 Replicas: 0 Isr: 0
이후 애플리케이션에서 발행한 이벤트를 읽어오기 위해 kafka consumer를 실행합니다.
$ bin/kafka-console-consumer.sh --topic logs --from-beginning --bootstrap-server localhost:9092
Log 생성
logback-kafka-appender 동작을 위해 간단한 controller를 생성합니다.
@RestController
@RequestMapping("/test")
public class TestController {
private static final Logger kafkaLogger = LoggerFactory.getLogger("kafkaLogger");
@GetMapping("/hello")
public void hello(@RequestParam(value = "name") String name) {
kafkaLogger.info("name:{}", name);
}
}
LoggerFactory를 통해 앞서 만들어준 kafkaLogger를 가져옵니다. 쿼리로 전달된 name에 대하여 kafkaLogger에 출력하도록 합니다.
localhost:8080/test/hello?name=changwooSon
위의 GET Request를 통해 TestController에 접근합니다. name으로 changwooSon이라는 값을 쿼리에 담아 전달하고, 실행했던 kafka-consumer를 확인합니다. appender에 지정한 encoder방식대로 로그 이벤트가 발행되고, 컨슈머가 전달됨을 확인할 수 있습니다.
# d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
21:31:14.208 [http-nio-8080-exec-1] INFO kafkaLogger - name:changwooSon
Async 한 KafkaAppender 추가
Kafka의 메타데이터의 교환이 일어날 때, 로그 이벤트는 브로커로 전달되지 못합니다. logging context는 최대 브로커에서 지정한 'metadata.max.age.ms' 시간만큼은 블록킹 상태로 머물게 됩니다. 이를 위해 AsyncAppender를 생성하고 appender-ref로 KafkaAppender를 전달하여 블록킹 방식을 피할 수 있습니다.
<configuration>
<!-- This is the kafkaAppender -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<!-- Kafka Appender configuration -->
</appender>
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="kafkaAppender" />
</appender>
<root level="info">
<appender-ref ref="ASYNC" />
</root>
</configuration>