Apache Kafka - recurring reblancing with more consumers
Context
We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.
The Issue
Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.
Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.
When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.
I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.
Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?
Additional Information
The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:
- compression.type=lz4
- message.max.bytes=10000000 # 10 MB
- replica.fetch.max.bytes=10000000 # 10 MB
- group.max.session.timeout.ms=1320000 # 22 min
- offset.retention.minutes=10080 # 7 days
On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):
INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:
- max.partition.fetch.bytes.config=200000000 # 200 MB
- max.poll.records.config=2
- session.timeout.ms.config=1200000 # 20 min
Log File
The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.
19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)
Many thanks for any help with this issue.
java apache-kafka kafka-consumer-api
add a comment |
Context
We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.
The Issue
Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.
Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.
When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.
I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.
Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?
Additional Information
The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:
- compression.type=lz4
- message.max.bytes=10000000 # 10 MB
- replica.fetch.max.bytes=10000000 # 10 MB
- group.max.session.timeout.ms=1320000 # 22 min
- offset.retention.minutes=10080 # 7 days
On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):
INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:
- max.partition.fetch.bytes.config=200000000 # 200 MB
- max.poll.records.config=2
- session.timeout.ms.config=1200000 # 20 min
Log File
The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.
19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)
Many thanks for any help with this issue.
java apache-kafka kafka-consumer-api
add a comment |
Context
We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.
The Issue
Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.
Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.
When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.
I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.
Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?
Additional Information
The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:
- compression.type=lz4
- message.max.bytes=10000000 # 10 MB
- replica.fetch.max.bytes=10000000 # 10 MB
- group.max.session.timeout.ms=1320000 # 22 min
- offset.retention.minutes=10080 # 7 days
On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):
INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:
- max.partition.fetch.bytes.config=200000000 # 200 MB
- max.poll.records.config=2
- session.timeout.ms.config=1200000 # 20 min
Log File
The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.
19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)
Many thanks for any help with this issue.
java apache-kafka kafka-consumer-api
Context
We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.
The Issue
Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.
Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.
When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.
I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.
Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?
Additional Information
The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:
- compression.type=lz4
- message.max.bytes=10000000 # 10 MB
- replica.fetch.max.bytes=10000000 # 10 MB
- group.max.session.timeout.ms=1320000 # 22 min
- offset.retention.minutes=10080 # 7 days
On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):
INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:
- max.partition.fetch.bytes.config=200000000 # 200 MB
- max.poll.records.config=2
- session.timeout.ms.config=1200000 # 20 min
Log File
The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.
19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)
Many thanks for any help with this issue.
java apache-kafka kafka-consumer-api
java apache-kafka kafka-consumer-api
asked Nov 24 '18 at 20:44
calloc_orgcalloc_org
13519
13519
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53462206%2fapache-kafka-recurring-reblancing-with-more-consumers%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53462206%2fapache-kafka-recurring-reblancing-with-more-consumers%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown