kafka基本操作
1. 安装Kafka ,可以使用本地脚本和下载的文件或 docker 镜像以 KRaft 模式运行
下载文件安装不多说,以 KRaft 模式运行如下
1 2
| docker pull apache/kafka:3.9.0 docker run -p 9092:9092 apache/kafka:3.9.0
|
2. 启动zookper
运行以下命令以便以正确的顺序启动所有服务:
1 2
| # Start the ZooKeeper service $ bin/zookeeper-server-start.sh config/zookeeper.properties
|
打开另一个终端会话并运行:
1 2
| # Start the Kafka broker service $ bin/kafka-server-start.sh config/server.properties
|
所有服务成功启动后,您将拥有一个正在运行并可供使用的基本 Kafka 环境。
但我习惯直接进入docker容器进行kafka操作

config是kafka配置文件,bin目录下是操作命令

包含一些命令,消费者命令,生产者命令,主题操作命令,zookeeper命令等
3.常用命令
文档链接 https://kafka.apache.org/documentation/#operations
4.java操作
- 生产者异步发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class CustomizeProducer { public static void main(String[] args) { // 1. 创建 kafka生产者的配置对象 Properties properties = new Properties(); // 2. 给kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 3. 创建 kafka生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 4. 调用 send方法,发送消息 for (int i = 200; i < 210; i++) { kafkaProducer.send(new ProducerRecord<>("my_topic",2,"","分区二上传数据:----atguigu_pt " + i)); } // 5. 关闭资源 kafkaProducer.close(); } }
|
- 生产者同步发送,调用get
1 2
| // 同步发送 kafkaProducer.send(new ProducerRecord<>("three", "同步发送----kafka" + i)).get();
|
- 事务
1 2 3 4 5 6 7 8 9 10 11 12 13
| Kafka 的事务一共有如下 5 个API // 1 初始化事务 void initTransactions(); // 2 开启事务 void beginTransaction() throws ProducerFencedException; // 3 在事务内提交已经消费的偏移量(主要用于消费者) void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException; // 4 提交事务 void commitTransaction() throws ProducerFencedException; // 5 放弃事务(类似于回滚事务的操作) void abortTransaction() throws ProducerFencedException;
|
- 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class CustomConsumerByHandSync { public static void main(String[] args) { // 1. 创建 kafka消费者配置类 Properties properties = new Properties(); // 2. 添加配置参数 // 添加连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //3. 创建kafka消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //4. 设置消费主题 形参是列表 consumer.subscribe(Arrays.asList("__consumer_offsets")); //5. 消费数据 while (true){ // 读取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 输出消息 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } // 同步提交offset consumer.commitSync(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class CustomConsumerByHandAsync { public static void main(String[] args) { // 1. 创建 kafka消费者配置类 Properties properties = new Properties(); // 2. 添加配置参数 // 添加连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 配置序列化 必须 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 配置消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //3. 创建Kafka消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //4. 设置消费主题 形参是列表 consumer.subscribe(Arrays.asList("my_topic")); //5. 消费数据 while (true){ // 读取消息 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); // 输出消息 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.value()); } // 异步提交offset consumer.commitAsync(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public class CustomConsumerForTime { public static void main(String[] args) { Properties properties = new Properties(); // 连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // key value 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2"); // 1 创建一个消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 订阅一个主题 ArrayList<String> topics = new ArrayList<>(); topics.add("my_topic"); kafkaConsumer.subscribe(topics); Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { kafkaConsumer.poll(Duration.ofSeconds(1)); // 获取消费者分区分配信息(有了分区分配信息才能开始消费) assignment = kafkaConsumer.assignment(); } Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); // 封装集合存储,每个分区对应一天前的数据 for (TopicPartition topicPartition : assignment) { timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); } // 获取从1天前开始消费的每个分区的 offset Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch); // 遍历每个分区,对每个分区设置消费时间。 for (TopicPartition topicPartition : assignment) { OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition); // 根据时间指定开始消费的位置 if (offsetAndTimestamp != null){ kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset()); } }// 3 消费该主题数据 while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class CustomConsumerSeek { public static void main(String[] args) { // 0 配置信息 Properties properties = new Properties(); // 连接 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // key value 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2"); // 1 创建一个消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); // 2 订阅一个主题 可订阅多个主题 // 指定要消费的主题和分区 <!-- String topic = "my_topic"; List<TopicPartition> partitions = new ArrayList<>(); partitions.add(new TopicPartition(topic, 0)); // 消费分区 0 partitions.add(new TopicPartition(topic, 1)); // 消费分区 1 // 手动分配分区 kafkaConsumer.assign(partitions); --> ArrayList<String> topics = new ArrayList<>(); topics.add("my_topic"); kafkaConsumer.subscribe(topics); Set<TopicPartition> assignment= new HashSet<>(); while (assignment.size() == 0) { kafkaConsumer.poll(Duration.ofSeconds(1)); // 获取消费者分区分配信息(有了分区分配信息才能开始消费) assignment = kafkaConsumer.assignment(); } // 遍历所有分区,并指定 offset从1700 的位置开始消费 for (TopicPartition tp: assignment) { kafkaConsumer.seek(tp, 1700); } // 3 消费该主题数据 while (true) { ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
|