[Kafka] Producer Send

메세지를 보내고 확인하지 않기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static void plainSendKafka() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
try {
producer.send(new ProducerRecord<String, String>("test", "Plain Send to Kafka"));
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}

동기 전송

카프카 프로듀서는 메세지를 보내고 send() 메소드의 Future객체를 리턴한다. get()을 통해 Future객체를 기다린 후 성공과 실패 여부를 확인한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void syncSendKafka() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
try {
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("test", "Sync Send to Kafka")).get();
System.out.printf("Partition : %d, Offset : %d\n", metadata.partition(), metadata.offset());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}

비동기 전송

send() 메소드와 콜백을 같이 호출해 응답을 받을 경우 콜백을 호출하도록 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static void asyncSendKafka() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
try {
producer.send(new ProducerRecord<String, String>("test", "Async Send to Kafka"), new kafkaCallback());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}

static class kafkaCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (metadata != null) {
System.out.println("Partition : " + metadata.partition() + ", Offset : " + metadata.offset() + "");
} else {
e.printStackTrace();
}
}
}
Author: Song Hayoung
Link: https://songhayoung.github.io/2020/07/13/kafka/producer_send/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.