Purge Kafka Topic
I pushed a message that was too big into a kafka message topic on my local machine, now I'm getting an error:
kafka.common.InvalidMessageSizeException: invalid message size
Increasing the fetch.size
is not ideal here, because I don't actually want to accept messages that big. Is there a way to purge the topic in kafka?
apache-kafka purge
add a comment |
I pushed a message that was too big into a kafka message topic on my local machine, now I'm getting an error:
kafka.common.InvalidMessageSizeException: invalid message size
Increasing the fetch.size
is not ideal here, because I don't actually want to accept messages that big. Is there a way to purge the topic in kafka?
apache-kafka purge
This question should be posted on meta.stackoverflow.com :-P
– kumar_harsh
Nov 18 '18 at 7:27
add a comment |
I pushed a message that was too big into a kafka message topic on my local machine, now I'm getting an error:
kafka.common.InvalidMessageSizeException: invalid message size
Increasing the fetch.size
is not ideal here, because I don't actually want to accept messages that big. Is there a way to purge the topic in kafka?
apache-kafka purge
I pushed a message that was too big into a kafka message topic on my local machine, now I'm getting an error:
kafka.common.InvalidMessageSizeException: invalid message size
Increasing the fetch.size
is not ideal here, because I don't actually want to accept messages that big. Is there a way to purge the topic in kafka?
apache-kafka purge
apache-kafka purge
edited Nov 30 '17 at 20:23
cricket_007
79.6k1142110
79.6k1142110
asked Apr 29 '13 at 17:10
Peter KlipfelPeter Klipfel
2,09422141
2,09422141
This question should be posted on meta.stackoverflow.com :-P
– kumar_harsh
Nov 18 '18 at 7:27
add a comment |
This question should be posted on meta.stackoverflow.com :-P
– kumar_harsh
Nov 18 '18 at 7:27
This question should be posted on meta.stackoverflow.com :-P
– kumar_harsh
Nov 18 '18 at 7:27
This question should be posted on meta.stackoverflow.com :-P
– kumar_harsh
Nov 18 '18 at 7:27
add a comment |
15 Answers
15
active
oldest
votes
Temporarily update the retention time on the topic to one second:
kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000
then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms
value.
4
That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
– Greg Dubicki
Nov 13 '15 at 10:38
17
I am not sure about checking the current config, but I believe resetting it back to default looks like:bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:07
8
Or depending on version:--delete-config retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:22
2
just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
– Alper Akture
Nov 18 '15 at 19:42
44
It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script.e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
– RHE
Jun 2 '16 at 7:09
|
show 6 more comments
Here are the steps I follow to delete a topic named MyTopic
:
- Describe the topic, and take not of the broker ids
- Stop the Apache Kafka daemon for each broker ID listed.
- Connect to each broker, and delete the topic data folder, e.g.
rm -rf /tmp/kafka-logs/MyTopic-0
. Repeat for other partitions, and all replicas - Delete the topic metadata:
zkCli.sh
thenrmr /brokers/MyTopic
- Start the Apache Kafka daemon for each stopped machine
If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh
).
Tested with Apache Kafka 0.8.0.
2
in 0.8.1./zookeeper-shell.sh localhost:2181
and./kafka-topics.sh --list --zookeeper localhost:2181
– pdeschen
Jun 2 '14 at 20:55
Can usezookeeper-client
instead ofzkCli.sh
(tried on Cloudera CDH5)
– Martin Tapp
Nov 11 '14 at 21:12
1
This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
1
This was the only way at the time it was written.
– Thomas Bratt
May 30 '15 at 16:56
2
Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
– codecraig
Sep 17 '15 at 10:58
|
show 3 more comments
To purge the queue you can delete the topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
then re-create it:
bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
10
Remember to add linedelete.topic.enable=true
in fileconfig/server.properties
, as the warning printed by the mentioned command saysNote: This will have no impact if delete.topic.enable is not set to true.
– Patrizio Bertoni
Aug 19 '15 at 13:20
This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
– Gaurav Khare
yesterday
add a comment |
While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs
.
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
Configurations set via this method can be displayed with the command
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
add a comment |
Tested in Kafka 0.8.2, for the quick-start example:
First, Add one line to server.properties file under config folder:
delete.topic.enable=true
then, you can run this command:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
add a comment |
UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.
Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.
This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
@MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
– Wildfire
May 30 '15 at 19:43
Steven Appleyard's answer is just as hacky as this one.
– Banjocat
Nov 21 '16 at 15:12
Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
– Nick
Sep 28 '18 at 16:02
add a comment |
kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.
first of make sure sever.properties file has and if not add delete.topic.enable=true
then, Delete topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
then create it again.
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
add a comment |
Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.
I follow these steps, particularly if you're using Avro.
1: Run with kafka tools :
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2: Run on Schema registry node:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Set topic retention back to the original setting, once topic is empty.
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
Hope this helps someone, as it isn't easily advertised.
add a comment |
The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:
- No need to bring down brokers, it's a runtime operation.
- Avoids the possibility of invalid offset exceptions (more on that below).
In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.
This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.
how to "set the date of the individual log files to be older than the retention period"? thanks
– bylijinnan
Mar 24 '16 at 13:05
add a comment |
Thomas' advice is great but unfortunately zkCli
in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr
. For example compare the command line implementation in modern Zookeeper with version 3.3.
If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python
) and you can do:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
or even
zk.delete_recursive('brokers')
if you want to remove all the topics from Kafka.
add a comment |
To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
– jsh
Apr 16 '15 at 21:43
add a comment |
Could not add as comment because of size:
Not sure if this is true, besides updating retention.ms and retention.bytes, but I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
The other problem is, you have to get current config first so you remember to revert after deletion is successful:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
add a comment |
Another, rather manual, approach for purging a topic is:
in the brokers:
stop kafka broker
sudo service kafka stop
delete all partition log files (should be done on all brokers)sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
in zookeeper:
run zookeeper command line interface
sudo /usr/lib/zookeeper/bin/zkCli.sh
use zkCli to remove the topic metadata
rmr /brokers/topic/<some_topic_name>
in the brokers again:
restart broker service
sudo service kafka start
add a comment |
From kafka 1.1
Purge a topic
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics ->-entity-name tp_binance_kline --add-config retention.ms=100
wait 1 minute, to be secure that kafka purge the topic
remove the configuration, and then go to default value
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics -> -entity-name tp_binance_kline --delete-config retention.ms
add a comment |
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic
This should give retention.ms
configured. Then you can use above alter command to change to 1second (and later revert back to default).
Topic:myTopic PartitionCount:6 ReplicationFactor:1 Configs:retention.ms=86400000
add a comment |
protected by mrsrinivas Dec 22 '18 at 2:07
Thank you for your interest in this question.
Because it has attracted low-quality or spam answers that had to be removed, posting an answer now requires 10 reputation on this site (the association bonus does not count).
Would you like to answer one of these unanswered questions instead?
15 Answers
15
active
oldest
votes
15 Answers
15
active
oldest
votes
active
oldest
votes
active
oldest
votes
Temporarily update the retention time on the topic to one second:
kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000
then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms
value.
4
That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
– Greg Dubicki
Nov 13 '15 at 10:38
17
I am not sure about checking the current config, but I believe resetting it back to default looks like:bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:07
8
Or depending on version:--delete-config retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:22
2
just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
– Alper Akture
Nov 18 '15 at 19:42
44
It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script.e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
– RHE
Jun 2 '16 at 7:09
|
show 6 more comments
Temporarily update the retention time on the topic to one second:
kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000
then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms
value.
4
That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
– Greg Dubicki
Nov 13 '15 at 10:38
17
I am not sure about checking the current config, but I believe resetting it back to default looks like:bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:07
8
Or depending on version:--delete-config retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:22
2
just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
– Alper Akture
Nov 18 '15 at 19:42
44
It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script.e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
– RHE
Jun 2 '16 at 7:09
|
show 6 more comments
Temporarily update the retention time on the topic to one second:
kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000
then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms
value.
Temporarily update the retention time on the topic to one second:
kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000
then wait for the purge to take effect (about one minute). Once purged, restore the previous retention.ms
value.
edited Apr 16 '15 at 13:44
answered Apr 16 '15 at 10:43
steven appleyardsteven appleyard
2,986264
2,986264
4
That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
– Greg Dubicki
Nov 13 '15 at 10:38
17
I am not sure about checking the current config, but I believe resetting it back to default looks like:bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:07
8
Or depending on version:--delete-config retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:22
2
just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
– Alper Akture
Nov 18 '15 at 19:42
44
It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script.e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
– RHE
Jun 2 '16 at 7:09
|
show 6 more comments
4
That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
– Greg Dubicki
Nov 13 '15 at 10:38
17
I am not sure about checking the current config, but I believe resetting it back to default looks like:bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:07
8
Or depending on version:--delete-config retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:22
2
just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
– Alper Akture
Nov 18 '15 at 19:42
44
It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script.e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>
– RHE
Jun 2 '16 at 7:09
4
4
That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
– Greg Dubicki
Nov 13 '15 at 10:38
That's a great answer but could you please add a description how to start with checking the topic's current retention.ms value?
– Greg Dubicki
Nov 13 '15 at 10:38
17
17
I am not sure about checking the current config, but I believe resetting it back to default looks like:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:07
I am not sure about checking the current config, but I believe resetting it back to default looks like:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --deleteConfig retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:07
8
8
Or depending on version:
--delete-config retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:22
Or depending on version:
--delete-config retention.ms
– aspergillusOryzae
Nov 17 '15 at 20:22
2
2
just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
– Alper Akture
Nov 18 '15 at 19:42
just an fyi, for kafka v. 0.9.0.0, it says: ubuntu@ip-172-31-21-201:/opt/kafka/kafka_2.10-0.9.0.0-SNAPSHOT$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic room-data --config retention.ms=1000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality
– Alper Akture
Nov 18 '15 at 19:42
44
44
It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script.
e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>– RHE
Jun 2 '16 at 7:09
It seems since 0.9.0, using kafka-topics.sh to alter the config is deprecated. The new option is to use the kafka-configs.sh script.
e.g. kafka-configs.sh --zookeeper <zkhost>:2181 --alter --entity-type topics --entity-name <topic name> --add-config retention.ms=1000
This also allows you to check the current retention period, e.g. kafka-configs --zookeeper <zkhost>:2181 --describe --entity-type topics --entity-name <topic name>– RHE
Jun 2 '16 at 7:09
|
show 6 more comments
Here are the steps I follow to delete a topic named MyTopic
:
- Describe the topic, and take not of the broker ids
- Stop the Apache Kafka daemon for each broker ID listed.
- Connect to each broker, and delete the topic data folder, e.g.
rm -rf /tmp/kafka-logs/MyTopic-0
. Repeat for other partitions, and all replicas - Delete the topic metadata:
zkCli.sh
thenrmr /brokers/MyTopic
- Start the Apache Kafka daemon for each stopped machine
If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh
).
Tested with Apache Kafka 0.8.0.
2
in 0.8.1./zookeeper-shell.sh localhost:2181
and./kafka-topics.sh --list --zookeeper localhost:2181
– pdeschen
Jun 2 '14 at 20:55
Can usezookeeper-client
instead ofzkCli.sh
(tried on Cloudera CDH5)
– Martin Tapp
Nov 11 '14 at 21:12
1
This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
1
This was the only way at the time it was written.
– Thomas Bratt
May 30 '15 at 16:56
2
Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
– codecraig
Sep 17 '15 at 10:58
|
show 3 more comments
Here are the steps I follow to delete a topic named MyTopic
:
- Describe the topic, and take not of the broker ids
- Stop the Apache Kafka daemon for each broker ID listed.
- Connect to each broker, and delete the topic data folder, e.g.
rm -rf /tmp/kafka-logs/MyTopic-0
. Repeat for other partitions, and all replicas - Delete the topic metadata:
zkCli.sh
thenrmr /brokers/MyTopic
- Start the Apache Kafka daemon for each stopped machine
If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh
).
Tested with Apache Kafka 0.8.0.
2
in 0.8.1./zookeeper-shell.sh localhost:2181
and./kafka-topics.sh --list --zookeeper localhost:2181
– pdeschen
Jun 2 '14 at 20:55
Can usezookeeper-client
instead ofzkCli.sh
(tried on Cloudera CDH5)
– Martin Tapp
Nov 11 '14 at 21:12
1
This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
1
This was the only way at the time it was written.
– Thomas Bratt
May 30 '15 at 16:56
2
Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
– codecraig
Sep 17 '15 at 10:58
|
show 3 more comments
Here are the steps I follow to delete a topic named MyTopic
:
- Describe the topic, and take not of the broker ids
- Stop the Apache Kafka daemon for each broker ID listed.
- Connect to each broker, and delete the topic data folder, e.g.
rm -rf /tmp/kafka-logs/MyTopic-0
. Repeat for other partitions, and all replicas - Delete the topic metadata:
zkCli.sh
thenrmr /brokers/MyTopic
- Start the Apache Kafka daemon for each stopped machine
If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh
).
Tested with Apache Kafka 0.8.0.
Here are the steps I follow to delete a topic named MyTopic
:
- Describe the topic, and take not of the broker ids
- Stop the Apache Kafka daemon for each broker ID listed.
- Connect to each broker, and delete the topic data folder, e.g.
rm -rf /tmp/kafka-logs/MyTopic-0
. Repeat for other partitions, and all replicas - Delete the topic metadata:
zkCli.sh
thenrmr /brokers/MyTopic
- Start the Apache Kafka daemon for each stopped machine
If you miss you step 3, then Apache Kafka will continue to report the topic as present (for example when if you run kafka-list-topic.sh
).
Tested with Apache Kafka 0.8.0.
edited Nov 23 '18 at 19:50
cricket_007
79.6k1142110
79.6k1142110
answered Feb 19 '14 at 13:32
Thomas BrattThomas Bratt
26.4k31102121
26.4k31102121
2
in 0.8.1./zookeeper-shell.sh localhost:2181
and./kafka-topics.sh --list --zookeeper localhost:2181
– pdeschen
Jun 2 '14 at 20:55
Can usezookeeper-client
instead ofzkCli.sh
(tried on Cloudera CDH5)
– Martin Tapp
Nov 11 '14 at 21:12
1
This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
1
This was the only way at the time it was written.
– Thomas Bratt
May 30 '15 at 16:56
2
Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
– codecraig
Sep 17 '15 at 10:58
|
show 3 more comments
2
in 0.8.1./zookeeper-shell.sh localhost:2181
and./kafka-topics.sh --list --zookeeper localhost:2181
– pdeschen
Jun 2 '14 at 20:55
Can usezookeeper-client
instead ofzkCli.sh
(tried on Cloudera CDH5)
– Martin Tapp
Nov 11 '14 at 21:12
1
This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
1
This was the only way at the time it was written.
– Thomas Bratt
May 30 '15 at 16:56
2
Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
– codecraig
Sep 17 '15 at 10:58
2
2
in 0.8.1
./zookeeper-shell.sh localhost:2181
and ./kafka-topics.sh --list --zookeeper localhost:2181
– pdeschen
Jun 2 '14 at 20:55
in 0.8.1
./zookeeper-shell.sh localhost:2181
and ./kafka-topics.sh --list --zookeeper localhost:2181
– pdeschen
Jun 2 '14 at 20:55
Can use
zookeeper-client
instead of zkCli.sh
(tried on Cloudera CDH5)– Martin Tapp
Nov 11 '14 at 21:12
Can use
zookeeper-client
instead of zkCli.sh
(tried on Cloudera CDH5)– Martin Tapp
Nov 11 '14 at 21:12
1
1
This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
This deletes the topic, not the data inside of it. This requires that the Broker be stopped. This is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
1
1
This was the only way at the time it was written.
– Thomas Bratt
May 30 '15 at 16:56
This was the only way at the time it was written.
– Thomas Bratt
May 30 '15 at 16:56
2
2
Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
– codecraig
Sep 17 '15 at 10:58
Worked for me on Kafka 0.8.2.1, though the topis in zookeeper were under /brokers/topics/<topic name here>
– codecraig
Sep 17 '15 at 10:58
|
show 3 more comments
To purge the queue you can delete the topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
then re-create it:
bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
10
Remember to add linedelete.topic.enable=true
in fileconfig/server.properties
, as the warning printed by the mentioned command saysNote: This will have no impact if delete.topic.enable is not set to true.
– Patrizio Bertoni
Aug 19 '15 at 13:20
This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
– Gaurav Khare
yesterday
add a comment |
To purge the queue you can delete the topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
then re-create it:
bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
10
Remember to add linedelete.topic.enable=true
in fileconfig/server.properties
, as the warning printed by the mentioned command saysNote: This will have no impact if delete.topic.enable is not set to true.
– Patrizio Bertoni
Aug 19 '15 at 13:20
This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
– Gaurav Khare
yesterday
add a comment |
To purge the queue you can delete the topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
then re-create it:
bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
To purge the queue you can delete the topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
then re-create it:
bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
edited Aug 29 '16 at 14:14
Eric Leschinski
85.9k37317272
85.9k37317272
answered Mar 24 '15 at 12:54
rjaiswalrjaiswal
65966
65966
10
Remember to add linedelete.topic.enable=true
in fileconfig/server.properties
, as the warning printed by the mentioned command saysNote: This will have no impact if delete.topic.enable is not set to true.
– Patrizio Bertoni
Aug 19 '15 at 13:20
This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
– Gaurav Khare
yesterday
add a comment |
10
Remember to add linedelete.topic.enable=true
in fileconfig/server.properties
, as the warning printed by the mentioned command saysNote: This will have no impact if delete.topic.enable is not set to true.
– Patrizio Bertoni
Aug 19 '15 at 13:20
This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
– Gaurav Khare
yesterday
10
10
Remember to add line
delete.topic.enable=true
in file config/server.properties
, as the warning printed by the mentioned command says Note: This will have no impact if delete.topic.enable is not set to true.
– Patrizio Bertoni
Aug 19 '15 at 13:20
Remember to add line
delete.topic.enable=true
in file config/server.properties
, as the warning printed by the mentioned command says Note: This will have no impact if delete.topic.enable is not set to true.
– Patrizio Bertoni
Aug 19 '15 at 13:20
This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
– Gaurav Khare
yesterday
This is not instantaneous always. Sometimes it will just mark for deletion and actual deletion will happen later.
– Gaurav Khare
yesterday
add a comment |
While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs
.
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
Configurations set via this method can be displayed with the command
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
add a comment |
While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs
.
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
Configurations set via this method can be displayed with the command
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
add a comment |
While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs
.
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
Configurations set via this method can be displayed with the command
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
While the accepted answer is correct, that method has been deprecated. Topic configuration should now be done via kafka-configs
.
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
Configurations set via this method can be displayed with the command
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
answered Apr 21 '16 at 17:56
Shane PerryShane Perry
631510
631510
add a comment |
add a comment |
Tested in Kafka 0.8.2, for the quick-start example:
First, Add one line to server.properties file under config folder:
delete.topic.enable=true
then, you can run this command:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
add a comment |
Tested in Kafka 0.8.2, for the quick-start example:
First, Add one line to server.properties file under config folder:
delete.topic.enable=true
then, you can run this command:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
add a comment |
Tested in Kafka 0.8.2, for the quick-start example:
First, Add one line to server.properties file under config folder:
delete.topic.enable=true
then, you can run this command:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Tested in Kafka 0.8.2, for the quick-start example:
First, Add one line to server.properties file under config folder:
delete.topic.enable=true
then, you can run this command:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
answered Jun 14 '15 at 20:02
PatrickPatrick
1,3301321
1,3301321
add a comment |
add a comment |
UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.
Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.
This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
@MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
– Wildfire
May 30 '15 at 19:43
Steven Appleyard's answer is just as hacky as this one.
– Banjocat
Nov 21 '16 at 15:12
Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
– Nick
Sep 28 '18 at 16:02
add a comment |
UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.
Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.
This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
@MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
– Wildfire
May 30 '15 at 19:43
Steven Appleyard's answer is just as hacky as this one.
– Banjocat
Nov 21 '16 at 15:12
Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
– Nick
Sep 28 '18 at 16:02
add a comment |
UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.
Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.
UPDATE: This answer is relevant for Kafka 0.6. For Kafka 0.8 and later see answer by @Patrick.
Yes, stop kafka and manually delete all files from corresponding subdirectory (it's easy to find it in kafka data directory). After kafka restart the topic will be empty.
edited Oct 1 '18 at 10:25
answered May 1 '13 at 6:56
WildfireWildfire
5,55222446
5,55222446
This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
@MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
– Wildfire
May 30 '15 at 19:43
Steven Appleyard's answer is just as hacky as this one.
– Banjocat
Nov 21 '16 at 15:12
Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
– Nick
Sep 28 '18 at 16:02
add a comment |
This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
@MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
– Wildfire
May 30 '15 at 19:43
Steven Appleyard's answer is just as hacky as this one.
– Banjocat
Nov 21 '16 at 15:12
Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
– Nick
Sep 28 '18 at 16:02
This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
This requires bringing down the Broker, and is at best a hack. Steven Appleyard's answer is really the absolute best.
– Jeff Maass
May 29 '15 at 17:13
@MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
– Wildfire
May 30 '15 at 19:43
@MaasSql I agree. :) This answer is two years old, about version 0.6. "alter topic" and "delete topic" functionality have been implemented later.
– Wildfire
May 30 '15 at 19:43
Steven Appleyard's answer is just as hacky as this one.
– Banjocat
Nov 21 '16 at 15:12
Steven Appleyard's answer is just as hacky as this one.
– Banjocat
Nov 21 '16 at 15:12
Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
– Nick
Sep 28 '18 at 16:02
Having an application handle deleting its own data in a supported way is far less hacky than turning said application off and deleting what you think are all of its data files then turning it back on.
– Nick
Sep 28 '18 at 16:02
add a comment |
kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.
first of make sure sever.properties file has and if not add delete.topic.enable=true
then, Delete topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
then create it again.
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
add a comment |
kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.
first of make sure sever.properties file has and if not add delete.topic.enable=true
then, Delete topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
then create it again.
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
add a comment |
kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.
first of make sure sever.properties file has and if not add delete.topic.enable=true
then, Delete topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
then create it again.
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
kafka don't have direct method for purge/clean-up topic (Queues), but can do this via deleting that topic and recreate it.
first of make sure sever.properties file has and if not add delete.topic.enable=true
then, Delete topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
then create it again.
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
answered Oct 9 '17 at 10:55
Manish JaiswalManish Jaiswal
172112
172112
add a comment |
add a comment |
Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.
I follow these steps, particularly if you're using Avro.
1: Run with kafka tools :
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2: Run on Schema registry node:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Set topic retention back to the original setting, once topic is empty.
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
Hope this helps someone, as it isn't easily advertised.
add a comment |
Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.
I follow these steps, particularly if you're using Avro.
1: Run with kafka tools :
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2: Run on Schema registry node:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Set topic retention back to the original setting, once topic is empty.
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
Hope this helps someone, as it isn't easily advertised.
add a comment |
Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.
I follow these steps, particularly if you're using Avro.
1: Run with kafka tools :
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2: Run on Schema registry node:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Set topic retention back to the original setting, once topic is empty.
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
Hope this helps someone, as it isn't easily advertised.
Sometimes, if you've a saturated cluster (too many partitions, or using encrypted topic data, or using SSL, or the controller is on a bad node, or the connection is flaky, it'll take a long time to purge said topic.
I follow these steps, particularly if you're using Avro.
1: Run with kafka tools :
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2: Run on Schema registry node:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Set topic retention back to the original setting, once topic is empty.
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
Hope this helps someone, as it isn't easily advertised.
answered Feb 15 '18 at 15:30
Ben CoughlanBen Coughlan
1611721
1611721
add a comment |
add a comment |
The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:
- No need to bring down brokers, it's a runtime operation.
- Avoids the possibility of invalid offset exceptions (more on that below).
In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.
This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.
how to "set the date of the individual log files to be older than the retention period"? thanks
– bylijinnan
Mar 24 '16 at 13:05
add a comment |
The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:
- No need to bring down brokers, it's a runtime operation.
- Avoids the possibility of invalid offset exceptions (more on that below).
In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.
This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.
how to "set the date of the individual log files to be older than the retention period"? thanks
– bylijinnan
Mar 24 '16 at 13:05
add a comment |
The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:
- No need to bring down brokers, it's a runtime operation.
- Avoids the possibility of invalid offset exceptions (more on that below).
In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.
This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.
The simplest approach is to set the date of the individual log files to be older than the retention period. Then the broker should clean them up and remove them for you within a few seconds. This offers several advantages:
- No need to bring down brokers, it's a runtime operation.
- Avoids the possibility of invalid offset exceptions (more on that below).
In my experience with Kafka 0.7.x, removing the log files and restarting the broker could lead to invalid offset exceptions for certain consumers. This would happen because the broker restarts the offsets at zero (in the absence of any existing log files), and a consumer that was previously consuming from the topic would reconnect to request a specific [once valid] offset. If this offset happens to fall outside the bounds of the new topic logs, then no harm and the consumer resumes at either the beginning or the end. But, if the offset falls within the bounds of the new topic logs, the broker attempts to fetch the message set but fails because the offset doesn't align to an actual message.
This could be mitigated by also clearing the consumer offsets in zookeeper for that topic. But if you don't need a virgin topic and just want to remove the existing contents, then simply 'touch'-ing a few topic logs is far easier and more reliable, than stopping brokers, deleting topic logs, and clearing certain zookeeper nodes.
answered Jun 6 '14 at 20:09
Andrew CarterAndrew Carter
192
192
how to "set the date of the individual log files to be older than the retention period"? thanks
– bylijinnan
Mar 24 '16 at 13:05
add a comment |
how to "set the date of the individual log files to be older than the retention period"? thanks
– bylijinnan
Mar 24 '16 at 13:05
how to "set the date of the individual log files to be older than the retention period"? thanks
– bylijinnan
Mar 24 '16 at 13:05
how to "set the date of the individual log files to be older than the retention period"? thanks
– bylijinnan
Mar 24 '16 at 13:05
add a comment |
Thomas' advice is great but unfortunately zkCli
in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr
. For example compare the command line implementation in modern Zookeeper with version 3.3.
If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python
) and you can do:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
or even
zk.delete_recursive('brokers')
if you want to remove all the topics from Kafka.
add a comment |
Thomas' advice is great but unfortunately zkCli
in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr
. For example compare the command line implementation in modern Zookeeper with version 3.3.
If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python
) and you can do:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
or even
zk.delete_recursive('brokers')
if you want to remove all the topics from Kafka.
add a comment |
Thomas' advice is great but unfortunately zkCli
in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr
. For example compare the command line implementation in modern Zookeeper with version 3.3.
If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python
) and you can do:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
or even
zk.delete_recursive('brokers')
if you want to remove all the topics from Kafka.
Thomas' advice is great but unfortunately zkCli
in old versions of Zookeeper (for example 3.3.6) do not seem to support rmr
. For example compare the command line implementation in modern Zookeeper with version 3.3.
If you are faced with an old version of Zookeeper one solution is to use a client library such as zc.zk for Python. For people not familiar with Python you need to install it using pip or easy_install. Then start a Python shell (python
) and you can do:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
or even
zk.delete_recursive('brokers')
if you want to remove all the topics from Kafka.
edited Oct 15 '15 at 0:32
answered Jun 3 '14 at 15:49
Mark ButlerMark Butler
3,65323138
3,65323138
add a comment |
add a comment |
To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
– jsh
Apr 16 '15 at 21:43
add a comment |
To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
– jsh
Apr 16 '15 at 21:43
add a comment |
To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
To clean up all the messages from a particular topic using your application group (GroupName should be same as application kafka group name).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
edited Nov 30 '17 at 20:24
cricket_007
79.6k1142110
79.6k1142110
answered Mar 25 '15 at 17:37
user4713340user4713340
191
191
There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
– jsh
Apr 16 '15 at 21:43
add a comment |
There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
– jsh
Apr 16 '15 at 21:43
There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
– jsh
Apr 16 '15 at 21:43
There is a problem with this approach (tested in 0.8.1.1). If an application subscribes to two (or more) topics: topic1 and topic2 and the console consumer cleans up topic1, unfortunately it also deletes the unrelated consumer offset for topic2, which causes replay of all messages from topic2.
– jsh
Apr 16 '15 at 21:43
add a comment |
Could not add as comment because of size:
Not sure if this is true, besides updating retention.ms and retention.bytes, but I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
The other problem is, you have to get current config first so you remember to revert after deletion is successful:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
add a comment |
Could not add as comment because of size:
Not sure if this is true, besides updating retention.ms and retention.bytes, but I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
The other problem is, you have to get current config first so you remember to revert after deletion is successful:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
add a comment |
Could not add as comment because of size:
Not sure if this is true, besides updating retention.ms and retention.bytes, but I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
The other problem is, you have to get current config first so you remember to revert after deletion is successful:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Could not add as comment because of size:
Not sure if this is true, besides updating retention.ms and retention.bytes, but I noticed topic cleanup policy should be "delete" (default), if "compact", it is going to hold on to messages longer, i.e., if it is "compact", you have to specify delete.retention.ms also.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Also had to monitor earliest/latest offsets should be same to confirm this successfully happened, can also check the du -h /tmp/kafka-logs/test-topic-3-100-*
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
The other problem is, you have to get current config first so you remember to revert after deletion is successful:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
answered Jun 14 '16 at 0:02
kisnakisna
1,20011222
1,20011222
add a comment |
add a comment |
Another, rather manual, approach for purging a topic is:
in the brokers:
stop kafka broker
sudo service kafka stop
delete all partition log files (should be done on all brokers)sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
in zookeeper:
run zookeeper command line interface
sudo /usr/lib/zookeeper/bin/zkCli.sh
use zkCli to remove the topic metadata
rmr /brokers/topic/<some_topic_name>
in the brokers again:
restart broker service
sudo service kafka start
add a comment |
Another, rather manual, approach for purging a topic is:
in the brokers:
stop kafka broker
sudo service kafka stop
delete all partition log files (should be done on all brokers)sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
in zookeeper:
run zookeeper command line interface
sudo /usr/lib/zookeeper/bin/zkCli.sh
use zkCli to remove the topic metadata
rmr /brokers/topic/<some_topic_name>
in the brokers again:
restart broker service
sudo service kafka start
add a comment |
Another, rather manual, approach for purging a topic is:
in the brokers:
stop kafka broker
sudo service kafka stop
delete all partition log files (should be done on all brokers)sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
in zookeeper:
run zookeeper command line interface
sudo /usr/lib/zookeeper/bin/zkCli.sh
use zkCli to remove the topic metadata
rmr /brokers/topic/<some_topic_name>
in the brokers again:
restart broker service
sudo service kafka start
Another, rather manual, approach for purging a topic is:
in the brokers:
stop kafka broker
sudo service kafka stop
delete all partition log files (should be done on all brokers)sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
in zookeeper:
run zookeeper command line interface
sudo /usr/lib/zookeeper/bin/zkCli.sh
use zkCli to remove the topic metadata
rmr /brokers/topic/<some_topic_name>
in the brokers again:
restart broker service
sudo service kafka start
edited Oct 7 '18 at 7:16
answered Oct 2 '18 at 15:18
Danny MorDanny Mor
55748
55748
add a comment |
add a comment |
From kafka 1.1
Purge a topic
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics ->-entity-name tp_binance_kline --add-config retention.ms=100
wait 1 minute, to be secure that kafka purge the topic
remove the configuration, and then go to default value
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics -> -entity-name tp_binance_kline --delete-config retention.ms
add a comment |
From kafka 1.1
Purge a topic
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics ->-entity-name tp_binance_kline --add-config retention.ms=100
wait 1 minute, to be secure that kafka purge the topic
remove the configuration, and then go to default value
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics -> -entity-name tp_binance_kline --delete-config retention.ms
add a comment |
From kafka 1.1
Purge a topic
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics ->-entity-name tp_binance_kline --add-config retention.ms=100
wait 1 minute, to be secure that kafka purge the topic
remove the configuration, and then go to default value
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics -> -entity-name tp_binance_kline --delete-config retention.ms
From kafka 1.1
Purge a topic
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics ->-entity-name tp_binance_kline --add-config retention.ms=100
wait 1 minute, to be secure that kafka purge the topic
remove the configuration, and then go to default value
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics -> -entity-name tp_binance_kline --delete-config retention.ms
answered Oct 9 '18 at 11:13
user644265user644265
313
313
add a comment |
add a comment |
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic
This should give retention.ms
configured. Then you can use above alter command to change to 1second (and later revert back to default).
Topic:myTopic PartitionCount:6 ReplicationFactor:1 Configs:retention.ms=86400000
add a comment |
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic
This should give retention.ms
configured. Then you can use above alter command to change to 1second (and later revert back to default).
Topic:myTopic PartitionCount:6 ReplicationFactor:1 Configs:retention.ms=86400000
add a comment |
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic
This should give retention.ms
configured. Then you can use above alter command to change to 1second (and later revert back to default).
Topic:myTopic PartitionCount:6 ReplicationFactor:1 Configs:retention.ms=86400000
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic
This should give retention.ms
configured. Then you can use above alter command to change to 1second (and later revert back to default).
Topic:myTopic PartitionCount:6 ReplicationFactor:1 Configs:retention.ms=86400000
edited Nov 18 '18 at 7:22
K.Dᴀᴠɪs
6,966112239
6,966112239
answered Nov 18 '18 at 6:55
tushararora19tushararora19
11
11
add a comment |
add a comment |
protected by mrsrinivas Dec 22 '18 at 2:07
Thank you for your interest in this question.
Because it has attracted low-quality or spam answers that had to be removed, posting an answer now requires 10 reputation on this site (the association bonus does not count).
Would you like to answer one of these unanswered questions instead?
This question should be posted on meta.stackoverflow.com :-P
– kumar_harsh
Nov 18 '18 at 7:27