# 1. Topic 확인
먼저 내가 가진 토픽이 뭔지 정확히 알아야 하므로 아래 커맨드로 확인 합니다.
1 | nakjunizm@localhost[/kafka/bin]./kafka-topics.sh --list --zookeeper localhost:3181 |
# 2. consumer all stop
offset을 수정하고자 하는 kafka topic 에 연결되어 있는 모든 consumer들을 shutdown 시킵니다.
# 3. Offset 확인
특정 토픽의 현재 Offset을 확인하는 방법 입니다.
1 | nakjunizm@localhost[/kafka/bin]./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9192 -topic MyTopic1 --time -1 |
각 옵션이 뜻하는것은 아래와 같습니다.
–broker-list <hostname:port,…, REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
–max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
–offsets <Integer: count> number of offsets returned (default: 1) –partitions
–time <Long: **timestamp/-1(latest)/-2 timestamp of the offsets before that (earliest)** >
–topic
# 4. Offset 재설정
이제 offset을 재설정 할텐데, 각 consumer group의 partition별로 설정해 줘야 합니다. kafka의 offset정보는 zookeeper에서 받아오기 때문에 zookeeper cli로 설정 합니다.
myconsumer 컨슈머그룹의 MyTopic1 topic 0번 partition의 offset만 100개 줄여 보겠습니다.
1 | nakjunizm@localhost[/zookeeper/bin]./zkCli.sh -server localhost:3181 |
여기까지 하고 나서 kafka tool 등으로 offset을 조정한 컨슈머그룹의 해당 토픽을 보면
offset이 100 줄어있고 lag이 100 늘어있는것을 확인 할 수 있습니다.
# 5. consumer startup
이제 consumer를 띄워서 바뀐 offset부터 마지막 처리해야 하는 logSize 까지 모두 처리하고 lag이 다시 0이 되는지 확인 합니다.