SW개발/Kafka

5. Producer (1)

이상한개발자 2020. 9. 18. 02:22

1. 요약

Producer 요약

2. Producer?

 메시지를 생산(Produce)해서 카프카의 토픽으로 메시지를 보내는 역할을 하는 애플리케이션 또는 서버

 

3. 명령어

  •  테스트 메세지 보내기
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list ${kafka broker명+id}:${port} --topic ${메세지를 보낼 topic명} 

 

  • 메세지 확인
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server ${kafka명+id}:${port} --topic ${topic명} --from-beginning

 

4. Java로 Producer 사용해보기

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord

import java.util.Properties

public class Producer {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "test-kafka001:9092,test-kafka002:9092,test-kafka003:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
    Producer<String, String> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, String>("test-topic","hello world!"));
    producer.close();
  }
}

=> prop의 bootstrap.servers는 브로커 리스트를 정의하고, key, value는 String을 사용할 것이므로, 내장된 StringSerializer를 지정

 

=> Producer 객체를 만들어서 설정값을 적용한 후, send를 통해 메세지를 보낼 topic과 메세지를 ProducerRecord에 입력 후 전달

 

4.1 메세지를 보내고 확인하지 않기

Producer<String, String> producer = new KafkaProducer<String, String>(props);
try {
  prodicer.send(new ProducerRecord<String, String>("test-topic", "hello kafka!");
} catch (Exception exception) {
  exception.printStackTrace();
} finally {
  producer.close();
}

 

4.2 동기 전송

Producer<String, String> producer = new KafkaProducer<String, String>(props);
try {
  RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("test-topic", "hello kafka!");
  System.out.printf("Partition: %d, Offset: %d", metadata.partition(), metadata.offset());
} catch (Exception exception) {
  exception.printStackTrace();
} finally {
  producer.close();
}