반응형
Kafka와 zookeeper 설치와 실행이 완료됬다면 spring 프로젝트에서 접근이 가능하다.
kafka가 로컬에 실행하였다면 간단하게 토픽이 생성되는 것 까지 볼수있다.
1. gradle Kafka 추가.
bulid.gradle
implementation 'org.springframework.kafka:spring-kafka'
2. yml (properties) 셋팅
application.yml
3. Consumer 셋팅
@Slf4j
@EnableKafka
@EnableRetry
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.retry.back-off-period}")
private Integer backOffPeriod;
@Value("${spring.kafka.consumer.retry.max-attempts}")
private Integer maxAttempts;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// allow a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// automatically reset the offset to the earliest offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
}
4. Producer 셋팅
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
5. 리스너 (메시지 받는부분) 셋팅
Listener.java
public static final String TOPIC_TEST = "topic_test";
/**
*
* @param message
*/
@KafkaListener(topics = TOPIC_TEST)
public void testListener(String message) {
}
6. 토픽 확인.
./kafka-topics.sh --list --bootstrap-server localhost:9092
bash (터미널) 에 토픽 리스트를 검색하게되면 서버 기동시 topics 들이 자동으로 등록된다.
다음엔 JDBC 를통해 카프카에 메시지를 적재하고 서비스 까지 전달하는 것을 진행해봐야겠다.
반응형
'Delvelopment > Kafka' 카테고리의 다른 글
[Kafka] Topic 메세지 보관주기 설정 (MSK) (0) | 2020.10.12 |
---|---|
[kafka Connect] 주기적으로 수행되는 무거운 쿼리 ALL_OBJECTS (table.poll.interval.ms) (3) | 2020.10.12 |
MQ (Message queue)란? (0) | 2020.08.21 |
[MAC, Kafka] 맥에 Kafka 설치 하고 토픽생성. (Docker, homebrew, Apache) (2) | 2020.06.29 |
[Apache Kafka] Kafka 란? (0) | 2020.05.21 |
댓글