카프카 컨슈머에 동적 쓰로틀링 적용하기

18 hours ago 2

이 글은 카프카(Kafka)를 사용하는 스프링 환경에서 메시지 처리 속도를 동적으로 조절해야하는 상황과 여러 쓰로틀링 기법들을 소개합니다. 카프카를 사용하는 스프링 프레임워크 환경에 익숙하고, 카프카의 기본 개념 및 컨슈머와 컨슈머 그룹, 파티션 구조를 이해하는 독자를 대상으로 작성되었습니다. 모든 예제의 전체 소스코드는 GitHub 저장소에서 확인하실 수 있습니다.

분산 서버에서 대규모의 요청이 발생할 때 수평 확장을 통해 처리량을 늘릴 수 있습니다. 마찬가지로, 카프카(Kafka)에 메시지 유입량이 증가하여 랙(Lag)이 발생하면 파티션 수를 늘리고 컨슈머를 수평 확장하여 처리량을 늘릴 수 있습니다. 하지만 컨슈머를 수평 확장해 처리량을 늘릴 때 반드시 고려해야 할 것이 있습니다. 바로 컨슈머의 수를 늘리더라도 컨슈머가 의존하고 있는 API나 DB같은 외부 시스템의 부하 가능성을 염두에 두어야 한다는 점입니다.

컨슈머가 DB에 의존하는 상황을 예로 들어보겠습니다. 실제 팀 내부에 있는 시스템 중 하나를 단순화해 표현한 것입니다. 토픽(topic)에 저장된 메시지의 페이로드(payload)에는 데이터가 있고 컨슈머는 메시지의 데이터를 DB에 INSERT하는 단순한 구조입니다.

이때 메시지의 유입량이 급증하면 컨슈머 그룹의 처리량이 따라가지 못하게 되고 랙(Lag)이 쌓이게 됩니다. 이 랙을 효과적으로 해소하기 위해서 무작정 컨슈머 수를 늘려 대응할 경우 DB의 리소스가 고갈되는 상황을 맞이할 수 있습니다. 실제로 저희 팀에서는 마스터 DB의 CPU 사용률이 80% 이상이 되어 알람이 발생했습니다.

결국 수평 확장 하더라도 컨슈머 그룹의 최대 처리량은 외부 시스템 리소스 한계로 결정됩니다. 위 예에서는 허용 가능한 최대 CPU 사용률까지 컨슈머 수를 조절해, 가용 리소스 내에서 최대 처리 속도로 랙을 해소할 수 있겠습니다.

하지만 외부 시스템의 한계(CPU 사용률뿐 아니라 메모리, 커넥션 등)를 고려해 늘릴 수 있는 컨슈머 수를 정하는 것은 정량적으로 결정하기 어렵고 결정하더라도 외부 시스템 변경에 영향을 받게 됩니다. 그리고 실제로는 DB를 사용하는 시스템이 다양하고, 컨슈머 수 조절은 컨슈머 그룹의 리밸런싱을 트리거해 메시지 처리 지연이나 중복 처리 등의 문제를 일으킬 수 있습니다.

컨슈머에서 프로그래밍으로 동적 쓰로틀링(Throttling)하기

앞서 "컨슈머 그룹의 최대 처리량은 외부 시스템 리소스 한계"라는 것과 처리량을 파티션과 컨슈머의 수로 조절할 때의 단점을 말씀드렸습니다. 그렇다면 리밸런싱을 피하면서도 외부 시스템에 부하를 주지 않는 선에서 동적으로 메시지 처리량을 조절하는 방법은 없을까요?

이 글에서 소개해드리는 방법은 컨슈머 수가 충분한 상황에서 컨슈머에 의도적인 지연을 주어 처리량을 쓰로틀링(Throttling)하는 것입니다. 이렇게 하면 컨슈머 수가 바뀌지 않아 리밸런싱을 피할 수 있고, 또 프로그래밍으로 지연 시간을 설정할 수 있다면 외부 시스템의 리소스를 모니터링해 자동으로 지연 시간을 조절하는 것이 가능합니다.

지연 시간을 적용하는 방법을 살펴보기 전에 CPU 사용률을 모니터링할 수 있는 객체를 만들어 보겠습니다.

CpuUsageMonitorClient 인터페이스를 정의하고 MockCpuUsageMonitorClient 구현체를 정의하였습니다. MockCpuUsageMonitorClient는 호출될 때마다 5%씩 증가된 CPU 사용률을 최대 90%까지 응답하도록 만들어진 Mock 객체입니다.(실제 상황에서는 AWS를 사용 중이라면 CloudWatchClient로 CPUUtilization 메트릭을 가져올 수 있겠습니다.)

public interface CpuUsageMonitorClient { int getCpuUsage(); } // 호출시마다 CPU 사용량을 5%씩 80%까지 증가시키는 Mock Client public class MockCpuUsageMonitorClient implements CpuUsageMonitorClient { private static final int CPU_USAGE_LIMIT = 90; private final AtomicInteger cpuUsage = new AtomicInteger(0); @Override public int getCpuUsage() { return cpuUsage.updateAndGet(current -> Math.min(current + 5, CPU_USAGE_LIMIT)); } }

그런 다음 CPU 사용률로 지연 시간을 계산하는 객체를 정의하였습니다.

public interface DelayTimeCalculator { long calculateDelayTime(); } @RequiredArgsConstructor public class CpuMonitoringDelayTimeCalculator implements DelayTimeCalculator { private final CpuUsageMonitorClient cpuUsageMonitorClient; @Override public long calculateDelayTime() { ... } }

CpuMonitoringDelayTimeCalculator는 CpuUsageMonitorClient가 응답한 CPU 사용률로 지연 시간을 계산하는 객체입니다. MIN_DELAY_TIME와 MAX_DELAY_TIME 사이에서 다음 그래프와 같은 형태로 지연시간을 반환합니다.

CpuMonitoringDelayTimeCalculator의 로직 자세히 보기 @Slf4j @RequiredArgsConstructor public class CpuMonitoringDelayTimeCalculator implements DelayTimeCalculator { private static final int ALLOWABLE_CPU_USAGE_LIMIT = 50; // 쓰로틀링을 최소로 유지하는 CPU 사용률(%) private static final int CPU_USAGE_LIMIT = 80; // 쓰로틀링을 최대로 하는 CPU 사용률(%) private static final long MIN_DELAY_TIME = 0L; // 최소 쓰로틀링일 때 지연 시간(ms) private static final long MAX_DELAY_TIME = 500L; // 최대 쓰로틀링일 때 지연 시간(ms) private final CpuUsageMonitorClient cpuUsageMonitorClient; @Override public long calculateDelayTime() { final int cpuUsage = cpuUsageMonitorClient.getCpuUsage(); log.info("CPU Usage: {}", cpuUsage); if (cpuUsage > CPU_USAGE_LIMIT) { return MAX_DELAY_TIME; } if (cpuUsage < ALLOWABLE_CPU_USAGE_LIMIT) { return MIN_DELAY_TIME; } final var calculatorFunction = quadratic( ALLOWABLE_CPU_USAGE_LIMIT, MIN_DELAY_TIME, CPU_USAGE_LIMIT, MAX_DELAY_TIME ); return calculatorFunction.apply(cpuUsage); } /** * (x0, y0), (x1, y1) 좌표를 지나는 2차함수 * y = r(x-x0)^2 + y0 */ private Function quadratic( int x0, long y0, int x1, long y1 ) { // y = r(x-x0)^2 + y0 final var r = (y1 - y0) / Math.pow(x1 - x0, 2); return x -> (long)(r * Math.pow(x - x0, 2) + y0); } }

CpuMonitoringDelayTimeCalculator가 지연 시간을 계산하는 로직은 다음과 같습니다.

객체에는 네가지 상수가 있습니다.
– ALLOWABLE_CPU_USAGE_LIMIT: 쓰로틀링을 최소로 유지하는 CPU 사용률(%)
– CPU_USAGE_LIMIT: 쓰로틀링을 최대로 하는 CPU 사용률(%)
– MIN_DELAY_TIME: 최소 쓰로틀링일 때 지연 시간(ms)
– MAX_PAUSE_TIME: 최대 쓰로틀링일 때 지연 시간(ms)

CPU 사용률을 최대한 낮게만 유지하는 것은 리소스를 제대로 활용하지 못하는 것이기 때문에 CPU 사용률이 ALLOWABLE_CPU_USAGE_LIMIT 보다 낮을 경우는 가장 작은 MIN_DELAY_TIME의 지연 시간을 반환하고, CPU 사용률이 CPU_USAGE_LIMIT 이상일 경우 가장 큰 MAX_DELAY_TIME을 반환합니다.

CPU 사용률이 ALLOWABLE_CPU_USAGE_LIMIT와 CPU_USAGE_LIMIT 사이일 경우에는 MIN_DELAY_TIME에서 MAX_DELAY_TIME로 점점 가파르게 증가하는 2차함수 형태의 지연 시간을 반환합니다.

컨슈머 지연시키기

이제 컨슈머에 지연 시간을 적용해보겠습니다.

Thread sleep

먼저 간단하게 Thread.sleep()으로 지연 시간을 적용하고 장단점을 짚어보겠습니다. 스프링 카프카 리스너를 구성하고, 리스너 메서드 마지막에 Thread.sleep()으로 지연 시간을 적용해보겠습니다. 카프카 리스너는 ConcurrentMessageListenerContainer로 구성하고 concurrency가 3으로 컨슈머 수는 3개입니다. 컨슈머 구성의 자세한 소스 코드는 GitHub 저장소에서 보실 수 있습니다.

@Slf4j public class KafkaThrottledListener { ... private final DelayTimeCalculator delayTimeCalculator; @KafkaListener( ... concurrency = "3" ) public void listen( ... ) throws InterruptedException { log.info("Processing Message: {}, partition: {}, container: {}", ...); // takes 50ms Thread.sleep(50); ack.acknowledge(); log.info("Ack Message: {}, partition: {}, container: {}", ...); // throttling final long delayTime = delayTimeCalculator.calculateDelayTime(); log.info("Delay Time: {}, partition: {}, container: {}", ...); if(delayTime > 0) { Thread.sleep(delayTime); } log.info("Delay End: {}, partition: {}, container: {}", ...); } ... }

테스트는 스프링 카프카에서 제공하는 @EmbeddedKafka를 이용해 in-memory 카프카에서 진행해 보겠습니다. 파티션 6개로 구성해 컨슈머 하나당 두 개의 파티션이 할당되도록 했습니다. 100개의 메시지를 전송하고 로그를 모니터링해 보겠습니다.

@EnableAutoConfiguration @SpringBootTest @EmbeddedKafka( partitions = 6, topics = {TOPIC}, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092", "max.poll.records=5", "auto.create.topics.enable=true", } ) @Slf4j public class KafkaThrottlingTest { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Test @DisplayName("Consumer throttling test") void consumerThrottlingTest() throws InterruptedException { IntStream.range(0, 100).forEach(this::sendMessage); Thread.sleep(10000L); } private void sendMessage(Integer i) { try { final var result = kafkaTemplate.send(TOPIC, String.valueOf(i), "message" + i).get(); final var partition = result.getRecordMetadata().partition(); log.info("Sent message: {}, partition: {}", i, partition); } catch (Exception e) { log.error("Error sending message: {}", e.getMessage()); } } }

테스트 로그에서는 100개의 메시지가 각 파티션에 분산 처리되며 CPU 사용률이 늘어나면서 점차 각각의 컨슈머에서 지연 시간이 적용되는 것을 확인할 수 있습니다. 특정 메시지 번호로 로그를 추적해보면 sleep이 끝나기 전까지 해당 파티션의 메시지를 처리하지 않는 것을 확인할 수 있습니다.

Thread.sleep()으로 지연 시간을 적용하는 방법은 간단하지만, 주의할 점이 있습니다. 카프카 브로커는 컨슈머의 상태를 아래 두 가지 요소로 판단하는데 둘 중 하나라도 정상이 아니라면 비정상으로 판단하고 해당 컨슈머를 제외해 리밸런싱을 트리거합니다.

  • 컨슈머가 살아있음을 알리기 위해 별도의 스레드에서 보내는 heartbeat
  • 주기적인 poll() (max.poll.interval.ms 내로 요청이 오는지)

Thread.sleep()을 호출하면 컨슈머의 heartbeat는 유지되지만 sleep 동안 다음 poll()이 발생하지 않아 컨슈머가 비정상으로 판단될 수 있습니다. 각 컨슈머는 poll()메서드를 호출해 최대 max.poll.records만큼의 메시지를 가져오고 리스너로 처리하는데, 리스너가 poll()로 가져온 메시지들을 한 번에 처리하는 Batch 리스너인지 메시지별로 처리하는 Record 리스너인지에 따라 다음과 같이 시간을 고려해야 리밸런싱을 피할 수 있습니다.

  • Record 리스너: max.poll.interval.ms > (메시지당 처리 시간 + sleep시간) * 메시지 수(최대 max.poll.records)
  • Batch 리스너: max.poll.interval.ms > 메시지 전체 처리 시간(최대 max.poll.records) + sleep시간

Thread.sleep()을 이용해 지연 시간을 적용한다면 max.poll.interval.ms를 충분히 길게 두고 sleep 시간을 잘 고려해야 합니다. 하지만 메시지당 처리 시간을 정확하게 알기 어렵고, 설정값이나 지연 시간의 상한 값을 조정할 때 항상 전체 시간을 염두에 두어야 하기 때문에 리밸런싱 위험이 있는 방법입니다.

KafkaConsumer의 pause(), resume()

그렇다면 프로그래밍적인 방법이지만 리밸런싱 위험을 피하면서 지연 시간을 적용하는 방법이 있을까요? 카프카는 특정 파티션의 메시지 수신을 일시적으로 지연할 수 있는 KafkaConsumer의 poll()과 resume() 메서드를 제공합니다. 그리고 스프링에서는 카프카 컨슈머를 추상화한 인터페이스인 MessageListenerContainer로 카프카 컨슈머의 pause(), resume()를 사용할 수 있습니다. 아래는 Spring for Apache Kafka 공식문서의 pause() 설명입니다.

When a container is paused, it continues to poll() the consumer, avoiding a rebalance if group management is being used, but it does not retrieve any records. See the Kafka documentation for more information.

그렇습니다. pause 된 컨슈머는 poll()을 멈추는 것이 아니라 지속적으로 poll()을 실행해 리밸런싱을 막으면서 poll()의 결과 records는 빈 형태로 동작합니다.

pause()와 resume()을 사용해 지연 시간을 적용해보겠습니다. MessageListenerContainer를 관리하는 ListenerContainerRegistry를 주입받아 MessageListenerContainer.pause()를 실행할 수 있습니다.

private final ListenerContainerRegistry registry; ... public void pause(String listenerId, long pauseDuration) { MessageListenerContainer container = this.registry.getListenerContainer(LISTENER_ID); Instant resumeAt = Instant.now().plusMillis(pauseDuration); container.pause(); ... }

pause() / resume()의 단위

MessageListenerContainer로 ConcurrentMessageListenerContainer를 사용하는 경우 concurrency 값을 1보다 크게 하면 별도의 스레드에서 컨슈머가 여러 개 생성되는데 ConcurrentMessageListenerContainer 내부에 concurrency 사이즈의 List<MessageListenerContainer>가 있는 형태입니다.

이때 외부 컨테이너에 pause()를 하면 내부의 리스트를 순회하면서 모든 컨테이너에 pause()를 실행합니다.

우리는 컨슈머와 매핑되는 실제 내부의 MessageListenerContainer 단위로 지연을 적용하겠습니다. 메시지 처리에 사용된 컨슈머가 아닌 다른 컨슈머까지 pause()되는 것보다 직관적이기 때문입니다. 좀 더 로직을 추가해서 컨슈머가 받은 메시지인 ConsumerRecord의 파티션으로 ConcurrentMessageListenerContainer 내부에서 해당 파티션이 할당된 MessageListenerContainer를 찾을 수 있습니다.

public void pause( String listenerId, ConsumerRecord<String, String> record, long pauseDuration ) { Instant resumeAt = Instant.now().plusMillis(pauseDuration); getContainer(listenerId, record).pause(); ... } private MessageListenerContainer getContainer( String listenerId, ConsumerRecord<String, String> record ) { final var concurrentContainer = (ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer(LISTENER_ID); return concurrentContainer.getContainerFor(record.topic(), record.partition()); }

resume()을 위한 스레드 풀 크기 결정하기

Thread.sleep()과는 다르게 pause()를 실행하고 일정 시간 뒤 resume()을 수행해야 합니다. 스레드 풀을 이용해 resume 타이밍을 스케줄링하겠습니다.

우리는 ConcurrentMessageListenerContainer내부의 MessageListenerContainer 단위의 pause(), resume()를 적용하기 때문에 중복으로 스케줄링 되지 않는다면 스레드 풀에서 사용되는 스레드 수가 쓰로틀링 대상이 되는 전체 컨슈머 수를 넘지 않습니다. @KafkaListener기준으로는 concurrency를 모두 고려해 결정합니다.

// 쓰로틀링 대상이 되는 MessageListenerContainer의 개수(ConcurrentMessageListenerContainer 내부 포함)를 // 넘어서는 스레드가 사용되지 않으므로 고려해서 크기를 결정한다. private static final int THROTTLING_POOL_SIZE = 3; @Bean public TaskScheduler throttlingTaskScheduler() { return new ThreadPoolTaskSchedulerBuilder() .poolSize(THROTTLING_POOL_SIZE) .build(); }

TaskScheduler.schedule()을 사용해 아래와 같이 지연시간 후 resume() 되도록 하겠습니다.

private final ListenerContainerRegistry registry; private final TaskScheduler scheduler; ... public void pause( String listenerId, ConsumerRecord<String, String> record, long pauseDuration ) { MessageListenerContainer container = getContainer(listenerId, record); Instant resumeAt = Instant.now().plusMillis(pauseDuration); container.pause(); this.scheduler.schedule(() -> { container.resume(); }, resumeAt); } ...

아래는 pause(), resume()을 이용해 바뀐 리스너입니다. pause에는 ListenerContainerPauseService를 사용했는데, ListenerContainerPauseService는 MessageListenerContainer의 pausing과 resuming을 관리하는 스프링에서 제공하는 서비스 클래스로 내부적으로는 위에서 다루었던 방식으로 pause와 resume 스케줄링을 제공합니다.

... private final ListenerContainerPauseService pauser; @KafkaListener( ... concurrency = "3" ) public void listen( ConsumerRecord<String, String> record, @Payload String message, Acknowledgment ack ) throws InterruptedException { final var container = getContainer(record); log.info("Processing Message: {}, partition: {}, container: {}", ... ); // takes 50ms Thread.sleep(50); // throttling final long delayTime = delayTimeCalculator.calculateDelayTime(); log.info("Delay Time: {}, partition: {}, container: {}", ... ); if(delayTime > 0) { pauser.pause(container, Duration.ofMillis(delayTime)); } // acknowledge ack.acknowledge(); log.info("Ack Message: {}, partition: {}, container: {}", ... ); } private MessageListenerContainer getContainer(ConsumerRecord<String, String> record) { final var concurrentContainer = (ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer(LISTENER_ID); return concurrentContainer.getContainerFor(record.topic(), record.partition()); }

바뀐 리스너로 테스트를 실행해보겠습니다. ListenerContainerPauseService 로그를 보기 위해 로깅 레벨을 바꾸겠습니다.

logging.level.org.springframework.kafka.listener.ListenerContainerPauseService=DEBUG

이번에는 좀 더 정확하게 컨테이너 단위로 추적하기 위해 로그에 컨테이너의 listenerId를 추가했습니다. concurrency가 N일때 각 컨슈머의 listenerId는 @KafkaListener의 id 뒤에 0~N 숫자가 부여됩니다. 로그에서 특정 컨테이너의 listenerId로 필터링한 로그만 보겠습니다. (throttled-topic2-listener-2)

테스트 로그를 보면 delayTime이 0보다 커진 이후로 pause()가 적용되는 것을 확인할 수 있습니다. 그런데 이 시점부터 아래 로그가 계속 등장하기 시작하는 것을 확인할 수 있습니다.
"Container KafkaMessageListenerContainer … already has pause requested"

pause()를 시도했지만 해당 컨테이너가 이미 pause 상태일 때 발생하는 메시지입니다. 왜 동일 컨테이너에 지속적으로 pause() 요청이 될까요? 이유는 다음 섹션에서 살펴보도록 하겠습니다.

지연 시점 결정하기

앞서 보았던 것처럼 KafkaConsumer의 pause()는 컨슈머가 poll()을 하는 시점에 빈 records를 가져오는 방식으로 지연을 적용하는 것입니다. 앞의 예제에서 스프링 카프카 리스너는 모두 Record 리스너 였습니다.

Record 리스너는 컨슈머가 poll()로 가져온 최대 max.poll.records 개수의 메시지를 각각 메시지 단위로 처리합니다. 따라서 특정 메시지를 처리 후 pause()를 적용하더라도 가져온 메시지를 모두 처리하기 전에는 지연이 발생하지 않습니다.

결국 KafkaConsumer의 pause(), resume()을 이용해 지연 시간을 적용한다는 것은 최대 max.poll.records만큼의 메시지를 다 처리한 후 지연을 발생시킨다는 것입니다.

이때 Record 리스너에서 메시지를 처리한 후 매번 pause()를 시도하게 되면 pause() 타이밍과 남은 메시지의 처리 속도에 따라 다음 poll()까지 충분한 지연 시간이 적용되지 않을 수 있기 때문에 의도한 대로 지연 시간을 정확하게 조절하기 힘들 수 있습니다. 그리고 앞서 가정했던 pool size보다 많은 스케줄링이 요청됩니다. 반면에 Batch 리스너를 적용하면 poll()에서 가져올 메시지들을 모두 처리한 뒤 정확한 시점에 pause()를 적용할 수 있습니다.

Batch 리스너에서 pause()하기

Batch 리스너로 변경해 쓰로틀링을 적용해보겠습니다. 먼저 ConcurrentKafkaListenerContainerFactory빈을 등록할 때 setBatchListener(true)를 적용하겠습니다.

final var factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); ... factory.setBatchListener(true);

리스너를 변경해 ConsumerRecords records 파라미터로 리스너 메서드에서 poll()로 가져온 메시지들을 모두 처리하도록 변경했습니다.

@KafkaListener( ... concurrency = "3" ) public void listen( ConsumerRecords<String, String> records, Acknowledgment ack ) { final var container = getContainer(records); records.forEach(record -> { log.info("Processing Message: {}, partition: {}, container: {}", ... ); // takes 50ms ... }); // throttling final long delayTime = delayTimeCalculator.calculateDelayTime(); log.info("Delay Time: {}, partition: {}", ... ); if(delayTime > 0) { pauser.pause(container, Duration.ofMillis(delayTime)); } // acknowledge log.info("Ack partitions: {}, Messages Count: {}", ... ); ack.acknowledge(); }

변경된 Batch 리스너로 테스트를 실행해보겠습니다. 이번에도 특정 컨테이너의 listenerId로 필터링한 로그만 보겠습니다. (throttled-topic3-listener-2)

이미 pause된 컨테이너에 pause를 시도하는 "Container KafkaMessageListenerContainer … already has pause requested" 메시지가 사라지고, 특정 컨테이너의 listenerId로 로그를 추적해 보면 pause()이후 resume()되기 전까지는 메시지를 처리하는 "Processing Message…" 로그가 없는 것을 확인할 수 있습니다.

"Resuming…" 로그 전에 "Pausing container…" 로그가 한 번 더 찍혀 당황스러울 수 있습니다. ListenerContainerPauseService에서 resume()직전에 pausing 로그를 한 번 더 출력해서 그런 것이니 무시하셔도 됩니다. 실제 pause()로그에는 뒤에 "resume scheduled for …"가 붙어있는 차이가 있습니다.

ConsumerInterceptor로 Commit 시점에 지연 적용하기

그렇다면 Record 리스너를 사용하면서 매번 pause()하지 않는 방법은 없을까요?

ConsumerInterceptor를 사용하면 poll()과 commit()시점에 관여할 수 있습니다. 이 인터셉터를 사용해서 Record 리스너를 사용하면서 커밋시점에 지연을 적용해보겠습니다.

하지만 ConsumerInterceptor는 카프카 클라이언트의 API고, 스프링에서 추가적인 통합 지원을 제공하지 않기 때문에 일반적인 스프링의 의존성 주입을 사용할 수 없습니다. 인터셉터의 configure()를 통해 설정에서 주입해야 합니다.

private ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); ... // interceptor props.put( ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaThrottlingInterceptor.class.getName() ); props.put("listenerRegistry", this.registry); props.put("pauseService", this.pauser); props.put("pauseTimeCalculator", this.delayTimeCalculator); return new DefaultKafkaConsumerFactory<>(props); }

onCommit() 메서드의 파라미터로 전달되는 Map offsets의 key에는 커밋 대상이 되는 파티션들이 포함됩니다. findTargetContainers() 메서드에는 해당 파티션이 할당된 Set을 찾는 로직을 구현했습니다.

@Slf4j public class KafkaThrottlingInterceptor implements ConsumerInterceptor<String, String> { private KafkaListenerEndpointRegistry registry; private ListenerContainerPauseService pauser; private DelayTimeCalculator delayTimeCalculator; ... @Override public void configure(Map<String, ?> configs) { this.registry = (KafkaListenerEndpointRegistry) configs.get("listenerRegistry"); this.pauser = (ListenerContainerPauseService) configs.get("pauseService"); this.delayTimeCalculator = (DelayTimeCalculator)configs.get("pauseTimeCalculator"); } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { final var targetContainers = findTargetContainers(offsets.keySet(), findConcurrentContainers()); log.info("onCommit. targetContainers: {}, offsets: {}", targetContainers, offsets); targetContainers.forEach(container -> { final long pauseTime = delayTimeCalculator.calculateDelayTime(); log.info("Delay Time: {}, container: {}", ... ); if(pauseTime > 0) { pauser.pause(container, Duration.ofMillis(pauseTime)); } }); } ... }

커밋 시점에 쓰로틀링을 적용할 때 앞선 예제와 마찬가지로 매번 pause()하지 않고 정확하게 지연을 적용하기 위해서는 컨슈머의 AckMode가 poll()로 가져온 메시지들을 모두 처리한 시점에 커밋하는 AckMode.BATCH 혹은 AckMode.MANUAL이어야 합니다.

@Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { final var factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); ... // Ackmode factory.getContainerProperties().setAckMode(AckMode.MANUAL); // BATCH or MANUAL return factory; }

리스너도 지연 시간 적용 코드가 제거되고 Record 리스너로 변경해 간단해졌습니다.

@KafkaListener( ... concurrency = "3" ) public void listen( ConsumerRecord<String, String> record, Acknowledgment ack ) throws InterruptedException { log.info("Processing Message: {}, partition: {}, container: {}", ... ); // takes 50ms Thread.sleep(50); // acknowledge ack.acknowledge(); log.info("Ack Message: {}, partition: {}, container: {}", ... ); }

마지막으로 테스트를 실행해보면, 컨테이너의 listenerId를 기준으로 "onCommit…" 로그 이후에 pause()가 실행되는 것을 확인할 수 있습니다. 그리고 이전과 마찬가지로 pause()된 컨테이너는 resume()되기 전까지는 메시지를 처리하는 "Processing Message…" 로그가 없는 것도 확인할 수 있습니다.

개선 사항

추가적으로 KafkaConsumer의 pause(), resume()을 이용한 지연 시간을 적용할 때 글의 분량상 생략한 부분을 조금 짚어보겠습니다.

실제로 ConsumerInterceptor를 사용해 커밋 시점에 지연 시간을 적용하는 경우라면, 커스텀 애노테이션(@Throttling 등)을 만들어 쓰로틀링이 필요한 리스너에 달아주고 인터셉터에서 식별하는 로직을 추가해야 대상 리스너에만 쓰로틀링을 적용할 수 있습니다.

그리고 Batch 리스너 단위로 지연 시간을 적용한다면 리스너에 직접 관련 코드를 작성하기 보다는 AOP로 분리하면 더 간결하게 적용가능합니다.

또 앞서 언급한 것처럼 pause()와 resume()으로 지연 시간을 적용한다는 것은 최대 max.poll.records만큼의 메시지를 다 처리한 후 지연을 발생시키는 것이기 때문에 지연이 충분히 의미를 가지도록 max.poll.records 크기를 너무 크지 않게 설정해야 합니다.

마치며

카프카 컨슈머에서 프로그래밍적인 방법으로 동적 쓰로틀링이 필요한 경우와 장점, 그리고 여러 방법과 고려사항들을 살펴봤습니다.

본문의 상황처럼 컨슈머를 수평 확장할 때 의존하고 있는 외부 리소스 때문에 쓰로틀링이 필요한 경우가 아니더라도, 여러 상황에서 동적인 쓰로틀링이 필요할 수 있습니다. 이 글에서 다룬 고려사항들이 조금이라도 도움이 되었으면 좋겠습니다.

Read Entire Article