메세지를 보내고 확인하지 않기
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private static void plainSendKafka() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props); try { producer.send(new ProducerRecord<String, String>("test", "Plain Send to Kafka")); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } }
|
동기 전송
카프카 프로듀서는 메세지를 보내고 send() 메소드의 Future객체를 리턴한다. get()을 통해 Future객체를 기다린 후 성공과 실패 여부를 확인한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private static void syncSendKafka() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props); try { RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("test", "Sync Send to Kafka")).get(); System.out.printf("Partition : %d, Offset : %d\n", metadata.partition(), metadata.offset()); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } }
|
비동기 전송
send() 메소드와 콜백을 같이 호출해 응답을 받을 경우 콜백을 호출하도록 한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private static void asyncSendKafka() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props); try { producer.send(new ProducerRecord<String, String>("test", "Async Send to Kafka"), new kafkaCallback()); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } }
static class kafkaCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception e) { if (metadata != null) { System.out.println("Partition : " + metadata.partition() + ", Offset : " + metadata.offset() + ""); } else { e.printStackTrace(); } } }
|