Kafka Clients 제공 Partitioner (kafka-clients 3.6.1v)
- kafka topic에는 partition이란 개념이 있다.
- producer는 각 message를 어떤 partition에 저장할지를 결정해야 하는데 이 책임을 가지고 있는 것이 Partitioner이다.
Interface
public interface Partitioner extends Configurable, Closeable {
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
void close();
/** @deprecated */
@Deprecated
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}
- kafka-clients 3.6.1 version 기준 Partitioner 인터페이스이다.
- partition(): topic, key 등의 정보를 바탕으로 해당 Message가 저장될 partition을 반환한는 메서드
- close(): 더이상 필요하지 않을 때 자원을 해제하기 위해 사용됨
- onNewbatch(): DefaultPartitioner and UniformStickyPartitioner에서 사용되었지만 현재는 Deprecated 상태이다. Partitioner에게 새로운 Batch가 생성 예정임을 알려 고정 파티션을 변경하는 기능이다.
Implementations
- kafka-clients 3.6.1 기준으로 UniformStickyPartitioner, DefaultPartitioner는 @Deprecated된 상태이고 RoundRobinPartitioner만 유지될 예정이다.
1. RoundRobinPartitioner
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
public RoundRobinPartitioner() {
}
public void configure(Map<String, ?> configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
public void close() {
}
}
- 이름에서부터 동작 방식을 파악할 수 있듯이 RoundRobin방식으로 Message를 produce한다.
- 로직을 보면 topic별로 AtomicInteger를 하나씩 관리하고, 이 값을 기반으로 파티션을 정하게 된다.
2. BuiltInPartitioner
- 앞서 kafka-clients 3.6.1 version에서는 UniformStickyPartitioner, DefaultPartitioner가 @Deprecated된 것을 확인했다.
- 그렇다면 Partitioner를 지정하지 않았을 때는 어떤 Partitioner를 사용하게 될까?
Kafka Producer의 partition(...) 메서드
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
if (record.partition() != null) {
return record.partition();
} else if (this.partitioner != null) {
int customPartition = this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format("The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
} else {
return customPartition;
}
} else {
return serializedKey != null && !this.partitionerIgnoreKeys ? BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size()) : -1;
}
}
- kafka producer는 위 메서드를 사용해서 message의 partition을 결정한다.
- 이때 record에 partition값이 이미 지정된 경우 혹은 partitioner가 지정된 경우가 아니라면 마지막 else 구문에 의해 반환값이 정해진다.
- 코드를 보면 key값이 없거나 key값을 무시하는 설정이 되어있다면 -1
- 그게 아니라면 BuiltInPartitioner의 partitionForKey(...)를 사용한다.
partition(...) 반환값이 -1이라면?
if (partition == -1) {
partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
effectivePartition = partitionInfo.partition();
} else {
partitionInfo = null;
effectivePartition = partition;
}
- 위 로직은 KafkaProducer가 Message를 발행하는 로직. 정확히는 RecordAccumulator 클래스의 append(...) 메서드이다.
- partition()메서드의 반환값이 -1이면 첫번째 if문을 타게 된다.
- RecordAccumulator 내에는 topic에 대한 정보를 가진 TopicInfo들이 저장되어 있다.
- 각 TopicInfo는 내부에 BuiltInPartitioner를 가지고 있는데 BuiltInPartitioner가 target partition값인 StickyPartitionInfo를 가지고 있고 이 객체를 기반으로 partition을 할당 받는다.
- 전체적인 동작 방식을 보면 RoundRobin 방식이지만 Message 1개당 Partition을 순차적으로 바꾸지 않는다.
- target partition으로 특정 양(내부 변수 stickyBatchSize)이상을 produce하기 전까지는 partition을 변경하지 않는다.
BuiltInPartitioner.partitionForKey(...)
public static int partitionForKey(byte[] serializedKey, int numPartitions) {
return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}
- Key값이 있는 Message의 경우 BuiltInPartitioner.partitionForKey(...)를 사용해 partition을 정한다.
- 위 코드를 보면 알 수 있듯이 key 기반의 해싱값으로 partition을 할당한다.
- 즉, 동일 key를 가지는 Message는 동일 partition이 보장되는 것이다.