1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| Properties properties = new Properties(); // 2. 给kafka 配置对象添加配置信息:bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); // key,value 序列化(必须):key.serializer,value.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // batch.size:批次大小,默认16K properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // linger.ms:等待时间,默认0 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // RecordAccumulator:缓冲区大小,默认32M:buffer.memory properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); // compression.type:压缩,默认 none,可配置值 gzip、snappy、 lz4和 zstd properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); // 3. 创建 kafka生产者对象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 4. 调用 send方法,发送消息 for (int i = 0; i < 5; i++) { kafkaProducer.send(new ProducerRecord<>("first","atguigu " + i)); } // 5. 关闭资源 kafkaProducer.close(); } }
|