만베거
egg528
Set Concurrency

ConcurrentKafkaListenerContainerFactory.setConcurrency

Kafka와 처리 순서 보장

  • 기본적으로 Kafka Consumer는 메시지 처리의 순서를 보장할 수 없다.
  • 만약 메시지 처리 순서를 보장하고 싶다면 파티션을 1개만 두어야 한다.
  • Partition을 늘릴 경우 개별 Partition 내의 메시지 처리 순서는 보장할 수 있지만, 토픽 전체의 처리 순서는 보장할 수 없다.
  • 즉, Kafka를 쓴다는 건 메시지의 처리 순서를 보장하지 않아도 되는 상황이라는 의미이다.

Kafka와 동시 처리

  • 순서가 보장되지 않아도 된다는 것은 동시 처리를 통해 성능상의 이점을 가질 수 있다는 의미이다.
  • Kafka에서 Message 처리 성능을 높이기 위한 방식 중 대표적인 건 브로커의 Partition 수를 추가하고, 이에 맞게 Consumer 수를 늘리는 일이다.
  • 위와 같이 설정하고 나면 개별 Partition에 쌓이는 Message양은 줄어들게 된다.
  • 이와 동시에 Partition과 Consumer 수를 1:1로 맞추게 된다면 Consumer가 처리해야하는 Message 수는 줄어들게 된다.

Consumer 수를 늘리는 방식

  • 가장 간단하게는 Consumer Application이 동작하는 서버를 Scale Out하면 된다.
  • 하지만 서버의 자원이 많이 남는다면 서버 수를 늘리기 이전에 ConcurrentKafkaListenerContainerFactory와 setConcurrency()메서드를 통해 서버를 늘리지 않고 Message 처리량을 늘릴 수 있다.

AbstractKafkaListenerContainerFactory

  • Spring은 AbstractKafkaListenerContainerFactory의 구현체로 KafkaMessageListenerContainer, ConcurrentKafkaListenerContainerFactory 2가지를 제공한다.
  • ConcurrentKafkaListenerContainerFactory의 경우 setConcurrency() 메서드를 통해 Consumer를 처리하는 Thread 개수를 설정할 수 있다.

ConcurrentKafkaListenerContainerFactory의 setConcurrency()의 역할 코드 타고 들어가보기

1. concurrency값은 무엇인가?

public class ConcurrentKafkaListenerContainerFactory<K, V> extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>, K, V> {
    private Integer concurrency;
 
    public ConcurrentKafkaListenerContainerFactory() {
    }
 
    public void setConcurrency(Integer concurrency) {
        this.concurrency = concurrency;
    }
    ...
}
  • ConcurrentKafkaListenerContainerFactory는 기본적으로 @KafkaListener가 동록된 메서드 기반의 ConcurrentMessageListenerContainer를 생성하는 역할을 한다.
  • 또한 내부적으로 concurrency 값을 가지고 있고 setConcurrency() 메서드를 사용해 concurrency값을 설정한다.

2. ConcurrentMessageListenerContainer 초기화

protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance, KafkaListenerEndpoint endpoint) {
    super.initializeContainer(instance, endpoint);
    Integer conc = endpoint.getConcurrency();
    if (conc != null) {
        instance.setConcurrency(conc);
    } else if (this.concurrency != null) {
        instance.setConcurrency(this.concurrency);
    }
}
  • ConcurrentKafkaListenerContainerFactory의 initializeContainer 메서드를 통해 ConcurrentMessageListenerContainer의 초기값을 설정한다.
  • 이때 기본 설정 로직은 부모 객체인 ConcurrentMessageListenerContainer의 initializeContainer()를 따라가지만 추가적으로 ConcurrentMessageListenerContainer에 concurrency값을 설정한다.
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList();
    private final List<AsyncTaskExecutor> executors = new ArrayList();
    private final AtomicInteger stoppedContainers = new AtomicInteger();
    private int concurrency = 1;
    private boolean alwaysClientIdSuffix = true;
    private volatile ConsumerStoppedEvent.Reason reason;
    ...
}
  • ConcurrentMessageListenerContainer 내부에도 concurrency값이 존재하며 default는 1이고 ConcurrentKafkaListenerContainerFactory의 concurrency값에 맞춰서 초기화 된다.
  • ConcurrentMessageListenerContainer는 이 값으로 무슨 일을 하는 걸까?
protected void doStart() {
    if (!this.isRunning()) {
        this.checkTopics();
        ContainerProperties containerProperties = this.getContainerProperties();
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (topicPartitions != null && this.concurrency > topicPartitions.length) {
            this.logger.warn(() -> {
                return "When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length;
            });
            this.concurrency = topicPartitions.length;
        }
    
        this.setRunning(true);
    
        for(int i = 0; i < this.concurrency; ++i) {
            KafkaMessageListenerContainer<K, V> container = this.constructContainer(containerProperties, topicPartitions, i);
            this.configureChildContainer(i, container);
            if (this.isPaused()) {
                container.pause();
            }
    
            container.start();
            this.containers.add(container);
        }
    }
}
  • ConcurrentMessageListenerContainer의 doStart() 메서드이다.
  • 로직을 살펴보면 ConcurrentMessageListenerContainer가 할당 받은 Partition을 조회하고 concurrency값이 Partition보다 클 경우 Partition 개수를 concurrency로 update한다.
  • 다른 글을 통해 알게된 내용이지만 concurrency는 동시에 처리 가능한 Partition 개수를 뜻한다고 한다.
  • 즉, concurrency가 3이면 3개의 Thread가 생성되어 3개의 Partition을 처리한다.
  • 만약 Thread가 Partition 수보다 크다면 낭비되는 Thread가 생성되는 것이라 생각하면 이 로직이 이해가 된다.
  • 이후 작업을 보면 concurrency 개수만큼의 KafkaMessageListenerContainer를 생성하고
  • configureChildContainer 메서드 내에서 KafkaMessageListenerContainer에 개별 SimpleAsyncTaskExecutor를 등록한다.
  • 즉, concurrency 수만큼의 KafkaMessageListenerContainer가 생기고 각 Container는 개별 Thread로 동작한다.
  • 서버 리소스를 확인해보고 여유가 있다면 concurrency 수를 늘리는 게 비용 측면에서 더 효율적이지 않을까 싶다