Delvelopment/Kafka
[Spring Boot, kafka] 스프링 프로젝트에 kafka 리스너 적용기.
제제킴
2020. 6. 29. 17:08
반응형
Kafka와 zookeeper 설치와 실행이 완료됬다면 spring 프로젝트에서 접근이 가능하다.
kafka가 로컬에 실행하였다면 간단하게 토픽이 생성되는 것 까지 볼수있다.
1. gradle Kafka 추가.
bulid.gradle
implementation 'org.springframework.kafka:spring-kafka'
2. yml (properties) 셋팅
application.yml
3. Consumer 셋팅
@Slf4j
@EnableKafka
@EnableRetry
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.retry.back-off-period}")
private Integer backOffPeriod;
@Value("${spring.kafka.consumer.retry.max-attempts}")
private Integer maxAttempts;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// allow a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// automatically reset the offset to the earliest offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
}
4. Producer 셋팅
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
5. 리스너 (메시지 받는부분) 셋팅
Listener.java
public static final String TOPIC_TEST = "topic_test";
/**
*
* @param message
*/
@KafkaListener(topics = TOPIC_TEST)
public void testListener(String message) {
}
6. 토픽 확인.
./kafka-topics.sh --list --bootstrap-server localhost:9092
bash (터미널) 에 토픽 리스트를 검색하게되면 서버 기동시 topics 들이 자동으로 등록된다.
다음엔 JDBC 를통해 카프카에 메시지를 적재하고 서비스 까지 전달하는 것을 진행해봐야겠다.
반응형