Kafka Offset 재설정 하는 방법

# 1. Topic 확인

먼저 내가 가진 토픽이 뭔지 정확히 알아야 하므로 아래 커맨드로 확인 합니다.

1
2
3
4
5
6
7
nakjunizm@localhost[/kafka/bin]./kafka-topics.sh --list --zookeeper localhost:3181
...
MyTopic1
MyTopic2
MyTopic3
__consumer_offsets
...

# 2. consumer all stop

offset을 수정하고자 하는 kafka topic 에 연결되어 있는 모든 consumer들을 shutdown 시킵니다.

# 3. Offset 확인

특정 토픽의 현재 Offset을 확인하는 방법 입니다.

1
2
3
4
5
6
nakjunizm@localhost[/kafka/bin]./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9192 -topic MyTopic1 --time -1
...
MyTopic1:0:216968
MyTopic1:1:217575
MyTopic1:2:220127
MyTopic1:3:221329

각 옵션이 뜻하는것은 아래와 같습니다.
–broker-list <hostname:port,…, REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
–max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
–offsets <Integer: count> number of offsets returned (default: 1) –partitions comma separated list of partition ids. If not specified, it will find offsets for all partitions (default:)
–time <Long: **timestamp/-1(latest)/-2 timestamp of the offsets before that (earliest)** >
–topic REQUIRED: The topic to get offset from.

# 4. Offset 재설정

이제 offset을 재설정 할텐데, 각 consumer group의 partition별로 설정해 줘야 합니다. kafka의 offset정보는 zookeeper에서 받아오기 때문에 zookeeper cli로 설정 합니다.
myconsumer 컨슈머그룹의 MyTopic1 topic 0번 partition의 offset만 100개 줄여 보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
nakjunizm@localhost[/zookeeper/bin]./zkCli.sh -server localhost:3181
[zk: localhost:3181(CONNECTED) 0] ls /consumers/myconsumer/offsets/MyTopic1/
0 1 2 3
[zk: localhost:3181(CONNECTED) 0] ls /consumers/myconsumer/offsets/MyTopic1/0
[]
[zk: localhost:3181(CONNECTED) 1] get /consumers/myconsumer/offsets/MyTopic1/0
216968
...

[zk: localhost:3181(CONNECTED) 2] set /consumers/myconsumer/offsets/MyTopic1/0 216868

여기까지 하고 나서 kafka tool 등으로 offset을 조정한 컨슈머그룹의 해당 토픽을 보면
offset이 100 줄어있고 lag이 100 늘어있는것을 확인 할 수 있습니다.

# 5. consumer startup

이제 consumer를 띄워서 바뀐 offset부터 마지막 처리해야 하는 logSize 까지 모두 처리하고 lag이 다시 0이 되는지 확인 합니다.

Share