Apache Kafka - recurring reblancing with more consumers












0















Context



We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.



The Issue



Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.



Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.



When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.



I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.



Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?



Additional Information



The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:




  • compression.type=lz4

  • message.max.bytes=10000000 # 10 MB

  • replica.fetch.max.bytes=10000000 # 10 MB

  • group.max.session.timeout.ms=1320000 # 22 min

  • offset.retention.minutes=10080 # 7 days


On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):



INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)


The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:




  • max.partition.fetch.bytes.config=200000000 # 200 MB

  • max.poll.records.config=2

  • session.timeout.ms.config=1200000 # 20 min


Log File



The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.



19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)


Many thanks for any help with this issue.










share|improve this question



























    0















    Context



    We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.



    The Issue



    Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.



    Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.



    When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.



    I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.



    Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?



    Additional Information



    The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:




    • compression.type=lz4

    • message.max.bytes=10000000 # 10 MB

    • replica.fetch.max.bytes=10000000 # 10 MB

    • group.max.session.timeout.ms=1320000 # 22 min

    • offset.retention.minutes=10080 # 7 days


    On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):



    INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)


    The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:




    • max.partition.fetch.bytes.config=200000000 # 200 MB

    • max.poll.records.config=2

    • session.timeout.ms.config=1200000 # 20 min


    Log File



    The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.



    19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
    19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
    19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
    19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
    19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
    20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
    20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
    20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
    20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
    20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
    20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
    20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
    20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)


    Many thanks for any help with this issue.










    share|improve this question

























      0












      0








      0


      1






      Context



      We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.



      The Issue



      Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.



      Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.



      When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.



      I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.



      Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?



      Additional Information



      The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:




      • compression.type=lz4

      • message.max.bytes=10000000 # 10 MB

      • replica.fetch.max.bytes=10000000 # 10 MB

      • group.max.session.timeout.ms=1320000 # 22 min

      • offset.retention.minutes=10080 # 7 days


      On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):



      INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)


      The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:




      • max.partition.fetch.bytes.config=200000000 # 200 MB

      • max.poll.records.config=2

      • session.timeout.ms.config=1200000 # 20 min


      Log File



      The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.



      19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
      19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
      19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
      19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
      19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
      20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
      20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
      20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
      20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
      20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
      20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
      20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)


      Many thanks for any help with this issue.










      share|improve this question














      Context



      We are using Kafka to process large messages, very occasionally up to 10MB but mostly in the 500 KB range. The processing of a message can take up to around 30 seconds, but sometimes a minute.



      The Issue



      Processing the data with a lower amount of consumers (up to around 50) causes no recurring re-balancing by the broker and the processing works fine. Any re-balancing at this scale is also rather fast, mostly under a minute as per the broker logs.



      Once the consumers are scaled to 100 or 200, the consumers constantly re-balance, with intervals up to around 5 minutes. This results into 5 minutes working/consuming followed by 5 minutes re-balancing and then the same again. The consumer services don't fail, just re-balance for no real apparent reason. This leads to reduced throughput when scaling consumers up.



      When scaled to 2oo consumers, the processing performs at an average rate of 2 messages per minute per consumer. The processing speed for a single consumer when it is not re-balancing is around 6 messages per minute.



      I don't suspect the network of the data centers to be an issue as we have some consumers performing a different kind of processing on the messages and they have no issues passing 100's to 1000's of messages per minute.



      Did someone else experience this pattern and find a simple solution, e.g. changing a particular configuration parameter?



      Additional Information



      The Kafka brokers are version 2.0, and there are 10 of them across different data centers. Replication is set to 3. Partitions for this topic are 500. An excerpt of the specific broker configuration to suit the case of processing large messages better:




      • compression.type=lz4

      • message.max.bytes=10000000 # 10 MB

      • replica.fetch.max.bytes=10000000 # 10 MB

      • group.max.session.timeout.ms=1320000 # 22 min

      • offset.retention.minutes=10080 # 7 days


      On the consumer side we use the java client with a re-balance listener that clears up any buffered messages from the revoked partitions. This buffer is 10 messages large. The consumer clients run client API version 2.1, the update of the java client from 2.0 to 2.1 seems to significantly reduce broker logs of the following kind on these larger consumer numbers (we got these for almost every client and every re-balance before):



      INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)


      The consumers are in a different data center than the brokers. The commit of the offsets is performed async. Recurrent polling is performed in a thread that fills the buffer with a timeout of 15 seconds; once the buffer is full the thread is sleeping a few seconds and polls only when the buffer has free space. An excerpt of the configuration for the use case of larger messages:




      • max.partition.fetch.bytes.config=200000000 # 200 MB

      • max.poll.records.config=2

      • session.timeout.ms.config=1200000 # 20 min


      Log File



      The following is an extract of the broker log file that manages this particular group over a 30 minute time-frame. Naming reduced to my_group and mytopic. There are also a few entries from an unrelated topic.



      19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
      19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
      19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
      19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
      19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
      20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
      20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
      20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
      20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
      20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
      20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
      20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
      20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
      20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)


      Many thanks for any help with this issue.







      java apache-kafka kafka-consumer-api






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 24 '18 at 20:44









      calloc_orgcalloc_org

      13519




      13519
























          0






          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53462206%2fapache-kafka-recurring-reblancing-with-more-consumers%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53462206%2fapache-kafka-recurring-reblancing-with-more-consumers%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

          Calculate evaluation metrics using cross_val_predict sklearn

          Insert data from modal to MySQL (multiple modal on website)