Purge Kafka Topic












120














I pushed a message that was too big into a kafka message topic on my local machine, now I'm getting an error:



kafka.common.InvalidMessageSizeException: invalid message size


Increasing the fetch.size is not ideal here, because I don't actually want to accept messages that big. Is there a way to purge the topic in kafka?










share|improve this question
























  • This question should be posted on meta.stackoverflow.com :-P
    – kumar_harsh
    Nov 18 '18 at 7:27
















120














I pushed a message that was too big into a kafka message topic on my local machine, now I'm getting an error:



kafka.common.InvalidMessageSizeException: invalid message size


Increasing the fetch.size is not ideal here, because I don't actually want to accept messages that big. Is there a way to purge the topic in kafka?










share|improve this question
























  • This question should be posted on meta.stackoverflow.com :-P
    – kumar_harsh
    Nov 18 '18 at 7:27














120












120








120


65





I pushed a message that was too big into a kafka message topic on my local machine, now I'm getting an error:



kafka.common.InvalidMessageSizeException: invalid message size


Increasing the fetch.size is not ideal here, because I don't actually want to accept messages that big. Is there a way to purge the topic in kafka?










share|improve this question















I pushed a message that was too big into a kafka message topic on my local machine, now I'm getting an error:



kafka.common.InvalidMessageSizeException: invalid message size


Increasing the fetch.size is not ideal here, because I don't actually want to accept messages that big. Is there a way to purge the topic in kafka?







apache-kafka purge






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 30 '17 at 20:23









cricket_007

79.6k1142110




79.6k1142110










asked Apr 29 '13 at 17:10









Peter KlipfelPeter Klipfel

2,09422141




2,09422141












  • This question should be posted on meta.stackoverflow.com :-P
    – kumar_harsh
    Nov 18 '18 at 7:27


















  • This question should be posted on meta.stackoverflow.com :-P
    – kumar_harsh
    Nov 18 '18 at 7:27
















This question should be posted on meta.stackoverflow.com :-P
– kumar_harsh
Nov 18 '18 at 7:27




This question should be posted on meta.stackoverflow.com :-P
– kumar_harsh
Nov 18 '18 at 7:27












15 Answers
15






active

oldest

votes


















297














Temporarily update the retention time on the topic to one second:



kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000


then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value.






share|improve this answer



















  • 4




    That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
    – Greg Dubicki
    Nov 13 '15 at 10:38






  • 17




    I am not sure about checking the current config, but I believe resetting it back to default looks like: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
    – aspergillusOryzae
    Nov 17 '15 at 20:07






  • 8




    Or depending on version: --delete-config retention.ms
    – aspergillusOryzae
    Nov 17 '15 at 20:22






  • 2




    just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
    – Alper Akture
    Nov 18 '15 at 19:42






  • 44




    It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script. e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
    – RHE
    Jun 2 '16 at 7:09





















42














Here are the steps I follow to delete a topic named MyTopic:




  1. Describe the topic, and take not of the broker ids

  2. Stop the Apache Kafka daemon for each broker ID listed.

  3. Connect to each broker, and delete the topic data folder, e.g. rm -rf /tmp/kafka-logs/MyTopic-0. Repeat for other partitions, and all replicas

  4. Delete the topic metadata: zkCli.sh then rmr /brokers/MyTopic

  5. Start the Apache Kafka daemon for each stopped machine




If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh).



Tested with Apache Kafka 0.8.0.






share|improve this answer



















  • 2




    in 0.8.1 ./zookeeper-shell.sh localhost:2181 and ./kafka-topics.sh --list --zookeeper localhost:2181
    – pdeschen
    Jun 2 '14 at 20:55












  • Can use zookeeper-client instead of zkCli.sh (tried on Cloudera CDH5)
    – Martin Tapp
    Nov 11 '14 at 21:12






  • 1




    This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
    – Jeff Maass
    May 29 '15 at 17:13








  • 1




    This was the only way at the time it was written.
    – Thomas Bratt
    May 30 '15 at 16:56






  • 2




    Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
    – codecraig
    Sep 17 '15 at 10:58



















41














To purge the queue you can delete the topic:



bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test


then re-create it:



bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test





share|improve this answer



















  • 10




    Remember to add line delete.topic.enable=true in file config/server.properties, as the warning printed by the mentioned command says Note: This will have no impact if delete.topic.enable is not set to true.
    – Patrizio Bertoni
    Aug 19 '15 at 13:20












  • This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
    – Gaurav Khare
    yesterday



















34














While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs.



kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic


Configurations set via this method can be displayed with the command



kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic





share|improve this answer





























    25














    Tested in Kafka 0.8.2, for the quick-start example:
    First, Add one line to server.properties file under config folder:



    delete.topic.enable=true


    then, you can run this command:



    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test





    share|improve this answer





























      3














      UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.



      Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.






      share|improve this answer























      • This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
        – Jeff Maass
        May 29 '15 at 17:13












      • @MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
        – Wildfire
        May 30 '15 at 19:43












      • Steven Appleyard's answer is just as hacky as this one.
        – Banjocat
        Nov 21 '16 at 15:12










      • Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
        – Nick
        Sep 28 '18 at 16:02



















      2














      kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.



      first of make sure sever.properties file has and if not add delete.topic.enable=true



      then, Delete topic
      bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic



      then create it again.



      bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2





      share|improve this answer





























        2














        Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.



        I follow these steps, particularly if you're using Avro.



        1: Run with kafka tools :



        bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>


        2: Run on Schema registry node:



        kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning



        3: Set topic retention back to the original setting, once topic is empty.



        bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>


        Hope this helps someone, as it isn't easily advertised.






        share|improve this answer





























          1














          The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:




          1. No need to bring down brokers, it's a runtime operation.

          2. Avoids the possibility of invalid offset exceptions (more on that below).


          In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.



          This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.






          share|improve this answer





















          • how to "set the date of the individual log files to be older than the retention period"? thanks
            – bylijinnan
            Mar 24 '16 at 13:05



















          1














          Thomas' advice is great but unfortunately zkCli in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr. For example compare the command line implementation in modern Zookeeper with version 3.3.



          If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python) and you can do:



          import zc.zk
          zk = zc.zk.ZooKeeper('localhost:2181')
          zk.delete_recursive('brokers/MyTopic')


          or even



          zk.delete_recursive('brokers')


          if you want to remove all the topics from Kafka.






          share|improve this answer































            1














            To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).



            ./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group






            share|improve this answer























            • There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
              – jsh
              Apr 16 '15 at 21:43





















            0














            Could not add as comment because of size:
            Not sure if this is true, besides updating retention.ms and retention.bytes, but I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.



            ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

            Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1



            Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*



            ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
            26599762



            ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
            26599762



            The other problem is, you have to get current config first so you remember to revert after deletion is successful:
            ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics






            share|improve this answer





























              0














              Another, rather manual, approach for purging a topic is:



              in the brokers:





              1. stop kafka broker

                sudo service kafka stop


              2. delete all partition log files (should be done on all brokers)
                sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*


              in zookeeper:





              1. run zookeeper command line interface

                sudo /usr/lib/zookeeper/bin/zkCli.sh


              2. use zkCli to remove the topic metadata

                rmr /brokers/topic/<some_topic_name>


              in the brokers again:





              1. restart broker service

                sudo service kafka start






              share|improve this answer































                0














                From kafka 1.1



                Purge a topic




                bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics ->-entity-name tp_binance_kline --add-config retention.ms=100




                wait 1 minute, to be secure that kafka purge the topic
                remove the configuration, and then go to default value




                bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics -> -entity-name tp_binance_kline --delete-config retention.ms







                share|improve this answer





























                  0














                  ./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic


                  This should give retention.ms configured. Then you can use above alter command to change to 1second (and later revert back to default).



                  Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000





                  share|improve this answer






















                    protected by mrsrinivas Dec 22 '18 at 2:07



                    Thank you for your interest in this question.
                    Because it has attracted low-quality or spam answers that had to be removed, posting an answer now requires 10 reputation on this site (the association bonus does not count).



                    Would you like to answer one of these unanswered questions instead?














                    15 Answers
                    15






                    active

                    oldest

                    votes








                    15 Answers
                    15






                    active

                    oldest

                    votes









                    active

                    oldest

                    votes






                    active

                    oldest

                    votes









                    297














                    Temporarily update the retention time on the topic to one second:



                    kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000


                    then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value.






                    share|improve this answer



















                    • 4




                      That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
                      – Greg Dubicki
                      Nov 13 '15 at 10:38






                    • 17




                      I am not sure about checking the current config, but I believe resetting it back to default looks like: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
                      – aspergillusOryzae
                      Nov 17 '15 at 20:07






                    • 8




                      Or depending on version: --delete-config retention.ms
                      – aspergillusOryzae
                      Nov 17 '15 at 20:22






                    • 2




                      just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
                      – Alper Akture
                      Nov 18 '15 at 19:42






                    • 44




                      It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script. e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
                      – RHE
                      Jun 2 '16 at 7:09


















                    297














                    Temporarily update the retention time on the topic to one second:



                    kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000


                    then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value.






                    share|improve this answer



















                    • 4




                      That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
                      – Greg Dubicki
                      Nov 13 '15 at 10:38






                    • 17




                      I am not sure about checking the current config, but I believe resetting it back to default looks like: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
                      – aspergillusOryzae
                      Nov 17 '15 at 20:07






                    • 8




                      Or depending on version: --delete-config retention.ms
                      – aspergillusOryzae
                      Nov 17 '15 at 20:22






                    • 2




                      just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
                      – Alper Akture
                      Nov 18 '15 at 19:42






                    • 44




                      It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script. e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
                      – RHE
                      Jun 2 '16 at 7:09
















                    297












                    297








                    297






                    Temporarily update the retention time on the topic to one second:



                    kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000


                    then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value.






                    share|improve this answer














                    Temporarily update the retention time on the topic to one second:



                    kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000


                    then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms value.







                    share|improve this answer














                    share|improve this answer



                    share|improve this answer








                    edited Apr 16 '15 at 13:44

























                    answered Apr 16 '15 at 10:43









                    steven appleyardsteven appleyard

                    2,986264




                    2,986264








                    • 4




                      That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
                      – Greg Dubicki
                      Nov 13 '15 at 10:38






                    • 17




                      I am not sure about checking the current config, but I believe resetting it back to default looks like: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
                      – aspergillusOryzae
                      Nov 17 '15 at 20:07






                    • 8




                      Or depending on version: --delete-config retention.ms
                      – aspergillusOryzae
                      Nov 17 '15 at 20:22






                    • 2




                      just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
                      – Alper Akture
                      Nov 18 '15 at 19:42






                    • 44




                      It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script. e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
                      – RHE
                      Jun 2 '16 at 7:09
















                    • 4




                      That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
                      – Greg Dubicki
                      Nov 13 '15 at 10:38






                    • 17




                      I am not sure about checking the current config, but I believe resetting it back to default looks like: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
                      – aspergillusOryzae
                      Nov 17 '15 at 20:07






                    • 8




                      Or depending on version: --delete-config retention.ms
                      – aspergillusOryzae
                      Nov 17 '15 at 20:22






                    • 2




                      just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
                      – Alper Akture
                      Nov 18 '15 at 19:42






                    • 44




                      It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script. e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
                      – RHE
                      Jun 2 '16 at 7:09










                    4




                    4




                    That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
                    – Greg Dubicki
                    Nov 13 '15 at 10:38




                    That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
                    – Greg Dubicki
                    Nov 13 '15 at 10:38




                    17




                    17




                    I am not sure about checking the current config, but I believe resetting it back to default looks like: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
                    – aspergillusOryzae
                    Nov 17 '15 at 20:07




                    I am not sure about checking the current config, but I believe resetting it back to default looks like: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
                    – aspergillusOryzae
                    Nov 17 '15 at 20:07




                    8




                    8




                    Or depending on version: --delete-config retention.ms
                    – aspergillusOryzae
                    Nov 17 '15 at 20:22




                    Or depending on version: --delete-config retention.ms
                    – aspergillusOryzae
                    Nov 17 '15 at 20:22




                    2




                    2




                    just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
                    – Alper Akture
                    Nov 18 '15 at 19:42




                    just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
                    – Alper Akture
                    Nov 18 '15 at 19:42




                    44




                    44




                    It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script. e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
                    – RHE
                    Jun 2 '16 at 7:09






                    It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script. e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000 This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
                    – RHE
                    Jun 2 '16 at 7:09















                    42














                    Here are the steps I follow to delete a topic named MyTopic:




                    1. Describe the topic, and take not of the broker ids

                    2. Stop the Apache Kafka daemon for each broker ID listed.

                    3. Connect to each broker, and delete the topic data folder, e.g. rm -rf /tmp/kafka-logs/MyTopic-0. Repeat for other partitions, and all replicas

                    4. Delete the topic metadata: zkCli.sh then rmr /brokers/MyTopic

                    5. Start the Apache Kafka daemon for each stopped machine




                    If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh).



                    Tested with Apache Kafka 0.8.0.






                    share|improve this answer



















                    • 2




                      in 0.8.1 ./zookeeper-shell.sh localhost:2181 and ./kafka-topics.sh --list --zookeeper localhost:2181
                      – pdeschen
                      Jun 2 '14 at 20:55












                    • Can use zookeeper-client instead of zkCli.sh (tried on Cloudera CDH5)
                      – Martin Tapp
                      Nov 11 '14 at 21:12






                    • 1




                      This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
                      – Jeff Maass
                      May 29 '15 at 17:13








                    • 1




                      This was the only way at the time it was written.
                      – Thomas Bratt
                      May 30 '15 at 16:56






                    • 2




                      Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
                      – codecraig
                      Sep 17 '15 at 10:58
















                    42














                    Here are the steps I follow to delete a topic named MyTopic:




                    1. Describe the topic, and take not of the broker ids

                    2. Stop the Apache Kafka daemon for each broker ID listed.

                    3. Connect to each broker, and delete the topic data folder, e.g. rm -rf /tmp/kafka-logs/MyTopic-0. Repeat for other partitions, and all replicas

                    4. Delete the topic metadata: zkCli.sh then rmr /brokers/MyTopic

                    5. Start the Apache Kafka daemon for each stopped machine




                    If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh).



                    Tested with Apache Kafka 0.8.0.






                    share|improve this answer



















                    • 2




                      in 0.8.1 ./zookeeper-shell.sh localhost:2181 and ./kafka-topics.sh --list --zookeeper localhost:2181
                      – pdeschen
                      Jun 2 '14 at 20:55












                    • Can use zookeeper-client instead of zkCli.sh (tried on Cloudera CDH5)
                      – Martin Tapp
                      Nov 11 '14 at 21:12






                    • 1




                      This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
                      – Jeff Maass
                      May 29 '15 at 17:13








                    • 1




                      This was the only way at the time it was written.
                      – Thomas Bratt
                      May 30 '15 at 16:56






                    • 2




                      Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
                      – codecraig
                      Sep 17 '15 at 10:58














                    42












                    42








                    42






                    Here are the steps I follow to delete a topic named MyTopic:




                    1. Describe the topic, and take not of the broker ids

                    2. Stop the Apache Kafka daemon for each broker ID listed.

                    3. Connect to each broker, and delete the topic data folder, e.g. rm -rf /tmp/kafka-logs/MyTopic-0. Repeat for other partitions, and all replicas

                    4. Delete the topic metadata: zkCli.sh then rmr /brokers/MyTopic

                    5. Start the Apache Kafka daemon for each stopped machine




                    If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh).



                    Tested with Apache Kafka 0.8.0.






                    share|improve this answer














                    Here are the steps I follow to delete a topic named MyTopic:




                    1. Describe the topic, and take not of the broker ids

                    2. Stop the Apache Kafka daemon for each broker ID listed.

                    3. Connect to each broker, and delete the topic data folder, e.g. rm -rf /tmp/kafka-logs/MyTopic-0. Repeat for other partitions, and all replicas

                    4. Delete the topic metadata: zkCli.sh then rmr /brokers/MyTopic

                    5. Start the Apache Kafka daemon for each stopped machine




                    If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh).



                    Tested with Apache Kafka 0.8.0.







                    share|improve this answer














                    share|improve this answer



                    share|improve this answer








                    edited Nov 23 '18 at 19:50









                    cricket_007

                    79.6k1142110




                    79.6k1142110










                    answered Feb 19 '14 at 13:32









                    Thomas BrattThomas Bratt

                    26.4k31102121




                    26.4k31102121








                    • 2




                      in 0.8.1 ./zookeeper-shell.sh localhost:2181 and ./kafka-topics.sh --list --zookeeper localhost:2181
                      – pdeschen
                      Jun 2 '14 at 20:55












                    • Can use zookeeper-client instead of zkCli.sh (tried on Cloudera CDH5)
                      – Martin Tapp
                      Nov 11 '14 at 21:12






                    • 1




                      This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
                      – Jeff Maass
                      May 29 '15 at 17:13








                    • 1




                      This was the only way at the time it was written.
                      – Thomas Bratt
                      May 30 '15 at 16:56






                    • 2




                      Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
                      – codecraig
                      Sep 17 '15 at 10:58














                    • 2




                      in 0.8.1 ./zookeeper-shell.sh localhost:2181 and ./kafka-topics.sh --list --zookeeper localhost:2181
                      – pdeschen
                      Jun 2 '14 at 20:55












                    • Can use zookeeper-client instead of zkCli.sh (tried on Cloudera CDH5)
                      – Martin Tapp
                      Nov 11 '14 at 21:12






                    • 1




                      This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
                      – Jeff Maass
                      May 29 '15 at 17:13








                    • 1




                      This was the only way at the time it was written.
                      – Thomas Bratt
                      May 30 '15 at 16:56






                    • 2




                      Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
                      – codecraig
                      Sep 17 '15 at 10:58








                    2




                    2




                    in 0.8.1 ./zookeeper-shell.sh localhost:2181 and ./kafka-topics.sh --list --zookeeper localhost:2181
                    – pdeschen
                    Jun 2 '14 at 20:55






                    in 0.8.1 ./zookeeper-shell.sh localhost:2181 and ./kafka-topics.sh --list --zookeeper localhost:2181
                    – pdeschen
                    Jun 2 '14 at 20:55














                    Can use zookeeper-client instead of zkCli.sh (tried on Cloudera CDH5)
                    – Martin Tapp
                    Nov 11 '14 at 21:12




                    Can use zookeeper-client instead of zkCli.sh (tried on Cloudera CDH5)
                    – Martin Tapp
                    Nov 11 '14 at 21:12




                    1




                    1




                    This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
                    – Jeff Maass
                    May 29 '15 at 17:13






                    This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
                    – Jeff Maass
                    May 29 '15 at 17:13






                    1




                    1




                    This was the only way at the time it was written.
                    – Thomas Bratt
                    May 30 '15 at 16:56




                    This was the only way at the time it was written.
                    – Thomas Bratt
                    May 30 '15 at 16:56




                    2




                    2




                    Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
                    – codecraig
                    Sep 17 '15 at 10:58




                    Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
                    – codecraig
                    Sep 17 '15 at 10:58











                    41














                    To purge the queue you can delete the topic:



                    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test


                    then re-create it:



                    bin/kafka-topics.sh --create --zookeeper localhost:2181 
                    --replication-factor 1 --partitions 1 --topic test





                    share|improve this answer



















                    • 10




                      Remember to add line delete.topic.enable=true in file config/server.properties, as the warning printed by the mentioned command says Note: This will have no impact if delete.topic.enable is not set to true.
                      – Patrizio Bertoni
                      Aug 19 '15 at 13:20












                    • This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
                      – Gaurav Khare
                      yesterday
















                    41














                    To purge the queue you can delete the topic:



                    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test


                    then re-create it:



                    bin/kafka-topics.sh --create --zookeeper localhost:2181 
                    --replication-factor 1 --partitions 1 --topic test





                    share|improve this answer



















                    • 10




                      Remember to add line delete.topic.enable=true in file config/server.properties, as the warning printed by the mentioned command says Note: This will have no impact if delete.topic.enable is not set to true.
                      – Patrizio Bertoni
                      Aug 19 '15 at 13:20












                    • This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
                      – Gaurav Khare
                      yesterday














                    41












                    41








                    41






                    To purge the queue you can delete the topic:



                    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test


                    then re-create it:



                    bin/kafka-topics.sh --create --zookeeper localhost:2181 
                    --replication-factor 1 --partitions 1 --topic test





                    share|improve this answer














                    To purge the queue you can delete the topic:



                    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test


                    then re-create it:



                    bin/kafka-topics.sh --create --zookeeper localhost:2181 
                    --replication-factor 1 --partitions 1 --topic test






                    share|improve this answer














                    share|improve this answer



                    share|improve this answer








                    edited Aug 29 '16 at 14:14









                    Eric Leschinski

                    85.9k37317272




                    85.9k37317272










                    answered Mar 24 '15 at 12:54









                    rjaiswalrjaiswal

                    65966




                    65966








                    • 10




                      Remember to add line delete.topic.enable=true in file config/server.properties, as the warning printed by the mentioned command says Note: This will have no impact if delete.topic.enable is not set to true.
                      – Patrizio Bertoni
                      Aug 19 '15 at 13:20












                    • This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
                      – Gaurav Khare
                      yesterday














                    • 10




                      Remember to add line delete.topic.enable=true in file config/server.properties, as the warning printed by the mentioned command says Note: This will have no impact if delete.topic.enable is not set to true.
                      – Patrizio Bertoni
                      Aug 19 '15 at 13:20












                    • This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
                      – Gaurav Khare
                      yesterday








                    10




                    10




                    Remember to add line delete.topic.enable=true in file config/server.properties, as the warning printed by the mentioned command says Note: This will have no impact if delete.topic.enable is not set to true.
                    – Patrizio Bertoni
                    Aug 19 '15 at 13:20






                    Remember to add line delete.topic.enable=true in file config/server.properties, as the warning printed by the mentioned command says Note: This will have no impact if delete.topic.enable is not set to true.
                    – Patrizio Bertoni
                    Aug 19 '15 at 13:20














                    This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
                    – Gaurav Khare
                    yesterday




                    This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
                    – Gaurav Khare
                    yesterday











                    34














                    While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs.



                    kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic


                    Configurations set via this method can be displayed with the command



                    kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic





                    share|improve this answer


























                      34














                      While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs.



                      kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic


                      Configurations set via this method can be displayed with the command



                      kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic





                      share|improve this answer
























                        34












                        34








                        34






                        While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs.



                        kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic


                        Configurations set via this method can be displayed with the command



                        kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic





                        share|improve this answer












                        While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs.



                        kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic


                        Configurations set via this method can be displayed with the command



                        kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic






                        share|improve this answer












                        share|improve this answer



                        share|improve this answer










                        answered Apr 21 '16 at 17:56









                        Shane PerryShane Perry

                        631510




                        631510























                            25














                            Tested in Kafka 0.8.2, for the quick-start example:
                            First, Add one line to server.properties file under config folder:



                            delete.topic.enable=true


                            then, you can run this command:



                            bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test





                            share|improve this answer


























                              25














                              Tested in Kafka 0.8.2, for the quick-start example:
                              First, Add one line to server.properties file under config folder:



                              delete.topic.enable=true


                              then, you can run this command:



                              bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test





                              share|improve this answer
























                                25












                                25








                                25






                                Tested in Kafka 0.8.2, for the quick-start example:
                                First, Add one line to server.properties file under config folder:



                                delete.topic.enable=true


                                then, you can run this command:



                                bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test





                                share|improve this answer












                                Tested in Kafka 0.8.2, for the quick-start example:
                                First, Add one line to server.properties file under config folder:



                                delete.topic.enable=true


                                then, you can run this command:



                                bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test






                                share|improve this answer












                                share|improve this answer



                                share|improve this answer










                                answered Jun 14 '15 at 20:02









                                PatrickPatrick

                                1,3301321




                                1,3301321























                                    3














                                    UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.



                                    Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.






                                    share|improve this answer























                                    • This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
                                      – Jeff Maass
                                      May 29 '15 at 17:13












                                    • @MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
                                      – Wildfire
                                      May 30 '15 at 19:43












                                    • Steven Appleyard's answer is just as hacky as this one.
                                      – Banjocat
                                      Nov 21 '16 at 15:12










                                    • Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
                                      – Nick
                                      Sep 28 '18 at 16:02
















                                    3














                                    UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.



                                    Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.






                                    share|improve this answer























                                    • This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
                                      – Jeff Maass
                                      May 29 '15 at 17:13












                                    • @MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
                                      – Wildfire
                                      May 30 '15 at 19:43












                                    • Steven Appleyard's answer is just as hacky as this one.
                                      – Banjocat
                                      Nov 21 '16 at 15:12










                                    • Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
                                      – Nick
                                      Sep 28 '18 at 16:02














                                    3












                                    3








                                    3






                                    UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.



                                    Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.






                                    share|improve this answer














                                    UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.



                                    Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.







                                    share|improve this answer














                                    share|improve this answer



                                    share|improve this answer








                                    edited Oct 1 '18 at 10:25

























                                    answered May 1 '13 at 6:56









                                    WildfireWildfire

                                    5,55222446




                                    5,55222446












                                    • This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
                                      – Jeff Maass
                                      May 29 '15 at 17:13












                                    • @MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
                                      – Wildfire
                                      May 30 '15 at 19:43












                                    • Steven Appleyard's answer is just as hacky as this one.
                                      – Banjocat
                                      Nov 21 '16 at 15:12










                                    • Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
                                      – Nick
                                      Sep 28 '18 at 16:02


















                                    • This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
                                      – Jeff Maass
                                      May 29 '15 at 17:13












                                    • @MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
                                      – Wildfire
                                      May 30 '15 at 19:43












                                    • Steven Appleyard's answer is just as hacky as this one.
                                      – Banjocat
                                      Nov 21 '16 at 15:12










                                    • Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
                                      – Nick
                                      Sep 28 '18 at 16:02
















                                    This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
                                    – Jeff Maass
                                    May 29 '15 at 17:13






                                    This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
                                    – Jeff Maass
                                    May 29 '15 at 17:13














                                    @MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
                                    – Wildfire
                                    May 30 '15 at 19:43






                                    @MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
                                    – Wildfire
                                    May 30 '15 at 19:43














                                    Steven Appleyard's answer is just as hacky as this one.
                                    – Banjocat
                                    Nov 21 '16 at 15:12




                                    Steven Appleyard's answer is just as hacky as this one.
                                    – Banjocat
                                    Nov 21 '16 at 15:12












                                    Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
                                    – Nick
                                    Sep 28 '18 at 16:02




                                    Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
                                    – Nick
                                    Sep 28 '18 at 16:02











                                    2














                                    kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.



                                    first of make sure sever.properties file has and if not add delete.topic.enable=true



                                    then, Delete topic
                                    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic



                                    then create it again.



                                    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2





                                    share|improve this answer


























                                      2














                                      kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.



                                      first of make sure sever.properties file has and if not add delete.topic.enable=true



                                      then, Delete topic
                                      bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic



                                      then create it again.



                                      bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2





                                      share|improve this answer
























                                        2












                                        2








                                        2






                                        kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.



                                        first of make sure sever.properties file has and if not add delete.topic.enable=true



                                        then, Delete topic
                                        bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic



                                        then create it again.



                                        bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2





                                        share|improve this answer












                                        kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.



                                        first of make sure sever.properties file has and if not add delete.topic.enable=true



                                        then, Delete topic
                                        bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic



                                        then create it again.



                                        bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2






                                        share|improve this answer












                                        share|improve this answer



                                        share|improve this answer










                                        answered Oct 9 '17 at 10:55









                                        Manish JaiswalManish Jaiswal

                                        172112




                                        172112























                                            2














                                            Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.



                                            I follow these steps, particularly if you're using Avro.



                                            1: Run with kafka tools :



                                            bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>


                                            2: Run on Schema registry node:



                                            kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning



                                            3: Set topic retention back to the original setting, once topic is empty.



                                            bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>


                                            Hope this helps someone, as it isn't easily advertised.






                                            share|improve this answer


























                                              2














                                              Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.



                                              I follow these steps, particularly if you're using Avro.



                                              1: Run with kafka tools :



                                              bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>


                                              2: Run on Schema registry node:



                                              kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning



                                              3: Set topic retention back to the original setting, once topic is empty.



                                              bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>


                                              Hope this helps someone, as it isn't easily advertised.






                                              share|improve this answer
























                                                2












                                                2








                                                2






                                                Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.



                                                I follow these steps, particularly if you're using Avro.



                                                1: Run with kafka tools :



                                                bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>


                                                2: Run on Schema registry node:



                                                kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning



                                                3: Set topic retention back to the original setting, once topic is empty.



                                                bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>


                                                Hope this helps someone, as it isn't easily advertised.






                                                share|improve this answer












                                                Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.



                                                I follow these steps, particularly if you're using Avro.



                                                1: Run with kafka tools :



                                                bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>


                                                2: Run on Schema registry node:



                                                kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning



                                                3: Set topic retention back to the original setting, once topic is empty.



                                                bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>


                                                Hope this helps someone, as it isn't easily advertised.







                                                share|improve this answer












                                                share|improve this answer



                                                share|improve this answer










                                                answered Feb 15 '18 at 15:30









                                                Ben CoughlanBen Coughlan

                                                1611721




                                                1611721























                                                    1














                                                    The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:




                                                    1. No need to bring down brokers, it's a runtime operation.

                                                    2. Avoids the possibility of invalid offset exceptions (more on that below).


                                                    In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.



                                                    This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.






                                                    share|improve this answer





















                                                    • how to "set the date of the individual log files to be older than the retention period"? thanks
                                                      – bylijinnan
                                                      Mar 24 '16 at 13:05
















                                                    1














                                                    The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:




                                                    1. No need to bring down brokers, it's a runtime operation.

                                                    2. Avoids the possibility of invalid offset exceptions (more on that below).


                                                    In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.



                                                    This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.






                                                    share|improve this answer





















                                                    • how to "set the date of the individual log files to be older than the retention period"? thanks
                                                      – bylijinnan
                                                      Mar 24 '16 at 13:05














                                                    1












                                                    1








                                                    1






                                                    The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:




                                                    1. No need to bring down brokers, it's a runtime operation.

                                                    2. Avoids the possibility of invalid offset exceptions (more on that below).


                                                    In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.



                                                    This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.






                                                    share|improve this answer












                                                    The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:




                                                    1. No need to bring down brokers, it's a runtime operation.

                                                    2. Avoids the possibility of invalid offset exceptions (more on that below).


                                                    In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.



                                                    This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.







                                                    share|improve this answer












                                                    share|improve this answer



                                                    share|improve this answer










                                                    answered Jun 6 '14 at 20:09









                                                    Andrew CarterAndrew Carter

                                                    192




                                                    192












                                                    • how to "set the date of the individual log files to be older than the retention period"? thanks
                                                      – bylijinnan
                                                      Mar 24 '16 at 13:05


















                                                    • how to "set the date of the individual log files to be older than the retention period"? thanks
                                                      – bylijinnan
                                                      Mar 24 '16 at 13:05
















                                                    how to "set the date of the individual log files to be older than the retention period"? thanks
                                                    – bylijinnan
                                                    Mar 24 '16 at 13:05




                                                    how to "set the date of the individual log files to be older than the retention period"? thanks
                                                    – bylijinnan
                                                    Mar 24 '16 at 13:05











                                                    1














                                                    Thomas' advice is great but unfortunately zkCli in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr. For example compare the command line implementation in modern Zookeeper with version 3.3.



                                                    If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python) and you can do:



                                                    import zc.zk
                                                    zk = zc.zk.ZooKeeper('localhost:2181')
                                                    zk.delete_recursive('brokers/MyTopic')


                                                    or even



                                                    zk.delete_recursive('brokers')


                                                    if you want to remove all the topics from Kafka.






                                                    share|improve this answer




























                                                      1














                                                      Thomas' advice is great but unfortunately zkCli in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr. For example compare the command line implementation in modern Zookeeper with version 3.3.



                                                      If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python) and you can do:



                                                      import zc.zk
                                                      zk = zc.zk.ZooKeeper('localhost:2181')
                                                      zk.delete_recursive('brokers/MyTopic')


                                                      or even



                                                      zk.delete_recursive('brokers')


                                                      if you want to remove all the topics from Kafka.






                                                      share|improve this answer


























                                                        1












                                                        1








                                                        1






                                                        Thomas' advice is great but unfortunately zkCli in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr. For example compare the command line implementation in modern Zookeeper with version 3.3.



                                                        If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python) and you can do:



                                                        import zc.zk
                                                        zk = zc.zk.ZooKeeper('localhost:2181')
                                                        zk.delete_recursive('brokers/MyTopic')


                                                        or even



                                                        zk.delete_recursive('brokers')


                                                        if you want to remove all the topics from Kafka.






                                                        share|improve this answer














                                                        Thomas' advice is great but unfortunately zkCli in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr. For example compare the command line implementation in modern Zookeeper with version 3.3.



                                                        If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python) and you can do:



                                                        import zc.zk
                                                        zk = zc.zk.ZooKeeper('localhost:2181')
                                                        zk.delete_recursive('brokers/MyTopic')


                                                        or even



                                                        zk.delete_recursive('brokers')


                                                        if you want to remove all the topics from Kafka.







                                                        share|improve this answer














                                                        share|improve this answer



                                                        share|improve this answer








                                                        edited Oct 15 '15 at 0:32

























                                                        answered Jun 3 '14 at 15:49









                                                        Mark ButlerMark Butler

                                                        3,65323138




                                                        3,65323138























                                                            1














                                                            To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).



                                                            ./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group






                                                            share|improve this answer























                                                            • There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
                                                              – jsh
                                                              Apr 16 '15 at 21:43


















                                                            1














                                                            To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).



                                                            ./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group






                                                            share|improve this answer























                                                            • There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
                                                              – jsh
                                                              Apr 16 '15 at 21:43
















                                                            1












                                                            1








                                                            1






                                                            To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).



                                                            ./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group






                                                            share|improve this answer














                                                            To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).



                                                            ./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group







                                                            share|improve this answer














                                                            share|improve this answer



                                                            share|improve this answer








                                                            edited Nov 30 '17 at 20:24









                                                            cricket_007

                                                            79.6k1142110




                                                            79.6k1142110










                                                            answered Mar 25 '15 at 17:37









                                                            user4713340user4713340

                                                            191




                                                            191












                                                            • There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
                                                              – jsh
                                                              Apr 16 '15 at 21:43




















                                                            • There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
                                                              – jsh
                                                              Apr 16 '15 at 21:43


















                                                            There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
                                                            – jsh
                                                            Apr 16 '15 at 21:43






                                                            There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
                                                            – jsh
                                                            Apr 16 '15 at 21:43













                                                            0














                                                            Could not add as comment because of size:
                                                            Not sure if this is true, besides updating retention.ms and retention.bytes, but I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.



                                                            ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

                                                            Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1



                                                            Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*



                                                            ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
                                                            26599762



                                                            ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
                                                            26599762



                                                            The other problem is, you have to get current config first so you remember to revert after deletion is successful:
                                                            ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics






                                                            share|improve this answer


























                                                              0














                                                              Could not add as comment because of size:
                                                              Not sure if this is true, besides updating retention.ms and retention.bytes, but I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.



                                                              ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

                                                              Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1



                                                              Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*



                                                              ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
                                                              26599762



                                                              ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
                                                              26599762



                                                              The other problem is, you have to get current config first so you remember to revert after deletion is successful:
                                                              ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics






                                                              share|improve this answer
























                                                                0












                                                                0








                                                                0






                                                                Could not add as comment because of size:
                                                                Not sure if this is true, besides updating retention.ms and retention.bytes, but I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.



                                                                ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

                                                                Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1



                                                                Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*



                                                                ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
                                                                26599762



                                                                ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
                                                                26599762



                                                                The other problem is, you have to get current config first so you remember to revert after deletion is successful:
                                                                ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics






                                                                share|improve this answer












                                                                Could not add as comment because of size:
                                                                Not sure if this is true, besides updating retention.ms and retention.bytes, but I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.



                                                                ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

                                                                Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1



                                                                Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*



                                                                ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
                                                                26599762



                                                                ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
                                                                26599762



                                                                The other problem is, you have to get current config first so you remember to revert after deletion is successful:
                                                                ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics







                                                                share|improve this answer












                                                                share|improve this answer



                                                                share|improve this answer










                                                                answered Jun 14 '16 at 0:02









                                                                kisnakisna

                                                                1,20011222




                                                                1,20011222























                                                                    0














                                                                    Another, rather manual, approach for purging a topic is:



                                                                    in the brokers:





                                                                    1. stop kafka broker

                                                                      sudo service kafka stop


                                                                    2. delete all partition log files (should be done on all brokers)
                                                                      sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*


                                                                    in zookeeper:





                                                                    1. run zookeeper command line interface

                                                                      sudo /usr/lib/zookeeper/bin/zkCli.sh


                                                                    2. use zkCli to remove the topic metadata

                                                                      rmr /brokers/topic/<some_topic_name>


                                                                    in the brokers again:





                                                                    1. restart broker service

                                                                      sudo service kafka start






                                                                    share|improve this answer




























                                                                      0














                                                                      Another, rather manual, approach for purging a topic is:



                                                                      in the brokers:





                                                                      1. stop kafka broker

                                                                        sudo service kafka stop


                                                                      2. delete all partition log files (should be done on all brokers)
                                                                        sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*


                                                                      in zookeeper:





                                                                      1. run zookeeper command line interface

                                                                        sudo /usr/lib/zookeeper/bin/zkCli.sh


                                                                      2. use zkCli to remove the topic metadata

                                                                        rmr /brokers/topic/<some_topic_name>


                                                                      in the brokers again:





                                                                      1. restart broker service

                                                                        sudo service kafka start






                                                                      share|improve this answer


























                                                                        0












                                                                        0








                                                                        0






                                                                        Another, rather manual, approach for purging a topic is:



                                                                        in the brokers:





                                                                        1. stop kafka broker

                                                                          sudo service kafka stop


                                                                        2. delete all partition log files (should be done on all brokers)
                                                                          sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*


                                                                        in zookeeper:





                                                                        1. run zookeeper command line interface

                                                                          sudo /usr/lib/zookeeper/bin/zkCli.sh


                                                                        2. use zkCli to remove the topic metadata

                                                                          rmr /brokers/topic/<some_topic_name>


                                                                        in the brokers again:





                                                                        1. restart broker service

                                                                          sudo service kafka start






                                                                        share|improve this answer














                                                                        Another, rather manual, approach for purging a topic is:



                                                                        in the brokers:





                                                                        1. stop kafka broker

                                                                          sudo service kafka stop


                                                                        2. delete all partition log files (should be done on all brokers)
                                                                          sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*


                                                                        in zookeeper:





                                                                        1. run zookeeper command line interface

                                                                          sudo /usr/lib/zookeeper/bin/zkCli.sh


                                                                        2. use zkCli to remove the topic metadata

                                                                          rmr /brokers/topic/<some_topic_name>


                                                                        in the brokers again:





                                                                        1. restart broker service

                                                                          sudo service kafka start







                                                                        share|improve this answer














                                                                        share|improve this answer



                                                                        share|improve this answer








                                                                        edited Oct 7 '18 at 7:16

























                                                                        answered Oct 2 '18 at 15:18









                                                                        Danny MorDanny Mor

                                                                        55748




                                                                        55748























                                                                            0














                                                                            From kafka 1.1



                                                                            Purge a topic




                                                                            bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics ->-entity-name tp_binance_kline --add-config retention.ms=100




                                                                            wait 1 minute, to be secure that kafka purge the topic
                                                                            remove the configuration, and then go to default value




                                                                            bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics -> -entity-name tp_binance_kline --delete-config retention.ms







                                                                            share|improve this answer


























                                                                              0














                                                                              From kafka 1.1



                                                                              Purge a topic




                                                                              bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics ->-entity-name tp_binance_kline --add-config retention.ms=100




                                                                              wait 1 minute, to be secure that kafka purge the topic
                                                                              remove the configuration, and then go to default value




                                                                              bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics -> -entity-name tp_binance_kline --delete-config retention.ms







                                                                              share|improve this answer
























                                                                                0












                                                                                0








                                                                                0






                                                                                From kafka 1.1



                                                                                Purge a topic




                                                                                bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics ->-entity-name tp_binance_kline --add-config retention.ms=100




                                                                                wait 1 minute, to be secure that kafka purge the topic
                                                                                remove the configuration, and then go to default value




                                                                                bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics -> -entity-name tp_binance_kline --delete-config retention.ms







                                                                                share|improve this answer












                                                                                From kafka 1.1



                                                                                Purge a topic




                                                                                bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics ->-entity-name tp_binance_kline --add-config retention.ms=100




                                                                                wait 1 minute, to be secure that kafka purge the topic
                                                                                remove the configuration, and then go to default value




                                                                                bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics -> -entity-name tp_binance_kline --delete-config retention.ms








                                                                                share|improve this answer












                                                                                share|improve this answer



                                                                                share|improve this answer










                                                                                answered Oct 9 '18 at 11:13









                                                                                user644265user644265

                                                                                313




                                                                                313























                                                                                    0














                                                                                    ./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic


                                                                                    This should give retention.ms configured. Then you can use above alter command to change to 1second (and later revert back to default).



                                                                                    Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000





                                                                                    share|improve this answer




























                                                                                      0














                                                                                      ./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic


                                                                                      This should give retention.ms configured. Then you can use above alter command to change to 1second (and later revert back to default).



                                                                                      Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000





                                                                                      share|improve this answer


























                                                                                        0












                                                                                        0








                                                                                        0






                                                                                        ./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic


                                                                                        This should give retention.ms configured. Then you can use above alter command to change to 1second (and later revert back to default).



                                                                                        Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000





                                                                                        share|improve this answer














                                                                                        ./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic


                                                                                        This should give retention.ms configured. Then you can use above alter command to change to 1second (and later revert back to default).



                                                                                        Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000






                                                                                        share|improve this answer














                                                                                        share|improve this answer



                                                                                        share|improve this answer








                                                                                        edited Nov 18 '18 at 7:22









                                                                                        K.Dᴀᴠɪs

                                                                                        6,966112239




                                                                                        6,966112239










                                                                                        answered Nov 18 '18 at 6:55









                                                                                        tushararora19tushararora19

                                                                                        11




                                                                                        11

















                                                                                            protected by mrsrinivas Dec 22 '18 at 2:07



                                                                                            Thank you for your interest in this question.
                                                                                            Because it has attracted low-quality or spam answers that had to be removed, posting an answer now requires 10 reputation on this site (the association bonus does not count).



                                                                                            Would you like to answer one of these unanswered questions instead?



                                                                                            Popular posts from this blog

                                                                                            A CLEAN and SIMPLE way to add appendices to Table of Contents and bookmarks

                                                                                            Calculate evaluation metrics using cross_val_predict sklearn

                                                                                            Insert data from modal to MySQL (multiple modal on website)