🔗 Distributed System

[Kafka] Kafka Streams 사용법

loose 2025. 1. 9. 15:46
반응형

개요

Kafka Streams를 사용하면 특정 시간동안 Kafka에 인입되는 데이터를 집계해서 통계치를 추출할 수 있습니다.

토픽에 있는 데이터를 빠른 속도로 실시간으로 변환하여 다른 토픽에 적재할 수 있는 것이 기본 동작 방식입니다.

아래 예제 코드는 깃헙에 올려놨습니다.

기본 코드

아래는 /a라는 메소드 요청에 대해 시간 별로 요청 건 수를 통계화해서 /b로 확인하는 코드입니다.

결과만 보면 /a라는 메소드는 14시에 2번, 15시에 4번 이용했다는 의미가 됩니다.

@RestController
@RequestMapping("/api")
@RequiredArgsConstructor
public class ApiController {

    private final ApiService apiService;

    @PostMapping("/a")
    public ResponseEntity<String> callApiA() {
        apiService.processApiCall();
        return ResponseEntity.ok("API A called successfully");
    }

    @GetMapping("/b")
    public ResponseEntity<Map<String, Long>> getHourlyStats() {
        return ResponseEntity.ok(apiService.getHourlyStats());
    }
}

 

@Service
@RequiredArgsConstructor
public class ApiService {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final KafkaStreamsService kafkaStreamsService;
    private static final String TOPIC_NAME = "api-calls";

    public void processApiCall() {
        String timestamp = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
        kafkaTemplate.send(TOPIC_NAME, timestamp);
    }

    public Map<String, Long> getHourlyStats() {
        return kafkaStreamsService.getHourlyStats();
    }
}

/a 메소드를 사용하면 api-calls 토픽에 timestamp를 저장합니다.

 private static final String TOPIC_NAME = "api-calls";
    private static final String STORE_NAME = "hourly-counts";
    private KafkaStreams kafkaStreams;

    @Autowired
    private StreamsBuilder streamsBuilder;

    @PostConstruct
    public void init() {
        KStream<String, String> stream = streamsBuilder.stream(TOPIC_NAME, 
            Consumed.with(Serdes.String(), Serdes.String())); 

        stream
            .groupBy((key, value) -> {
                LocalDateTime dateTime = LocalDateTime.parse(value, DateTimeFormatter.ISO_DATE_TIME);
                return dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH"));
            }, Grouped.with(Serdes.String(), Serdes.String()))
            .count(Materialized.as(STORE_NAME));

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "api-stats-application"); 
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        kafkaStreams.start();
    }

 

기본 개념

Kafka Streams 애플리케이션의 고유 ID

Kafka Streams를 사용하면 애플리케이션이 사용하는 고유 ID를 지정해야 합니다.

위에서는 api-stats-application이라고 명명했습니다.

ID를 지정하는 이유는 간단하게는 서버가 여러 대일 경우 클러스터링이 가능하고 Kafka Streams 애플리케이션 자체가 하나의 소비자 그룹처럼 동작하기 위함입니다.

Serde

Serde는 '세르드' 혹은 '서드'라고 불리우는 Serialization과 Deserialization의 합성어입니다.

Kafka의 토픽을 소비할 때 직렬화, 역직렬화에 대한 Serializer를 설정하기 위함인데, 위와 같이 props에 설정해서 전역적으로 사용할 수도 있고 개별적으로 Consumed.with를 통해 옵션 값으로 설정해서 사용할 수도 있습니다.

Kafka Streams의 상태 저장소

Kafka Streams의 핵심 개념입니다.

통계라고 하면 일정 시간 동안 데이터를 저장해야 하는데 이 때 사용하는 것이 상태 저장소입니다.

상태 저장소는 Kafka Streams가 실행되는 애플리케이션에 RocksDB라는 데이터베이스를 사용해서 디스크에 저장합니다.

다시 말해, Spring Boot를 이용한다면 Spring Boot가 실행되는 서버에 저장됩니다.

아래와 같이 Kafka Streams 의존성이 추가되면 RocksDB 의존성이 따라 붙습니다.

그리고 상태 저장소에 저장된 데이터는 일정 시간마다 Kafka 브로커에 전달되어 자동으로 생성된 Topic에 데이터를 저장하게 됩니다.

위에서는 hourly-counts라는 상태 저장소(State Store) 를 명명해서 사용했기 때문에 changelog라는 특별한 토픽이 아래와 같이 자동 생성됩니다.

changelog가 아닌 일반적으로 생성한 Topic은 바로 Kafka 브로커의 디스크에 저장되는 반면(쉽게 말해 카프카 서버에 저장되는 데이터), 상태 저장소는 우선 DB에 저장되었다가 Kafka 브로커에 데이터를 전달해서 토픽에 저장하는 방식이라고 생각하면 됩니다.

그러므로 상태 저장소는 Kafka Streams 애플리케이션이 재시작될 때 복구되므로 데이터는 영구적으로 유지됩니다.

 

 

728x90