본문 바로가기
Delvelopment/Kafka

[Spring Boot, kafka] 스프링 프로젝트에 kafka 리스너 적용기.

by 제제킴 2020. 6. 29.
반응형

Kafka와 zookeeper 설치와 실행이 완료됬다면 spring 프로젝트에서 접근이 가능하다.

 

kafka가 로컬에 실행하였다면 간단하게 토픽이 생성되는 것 까지 볼수있다.

 

 

1. gradle Kafka 추가.

 

bulid.gradle

implementation 'org.springframework.kafka:spring-kafka'

 

2. yml (properties) 셋팅 

 

application.yml

application.yml

3. Consumer 셋팅

@Slf4j
@EnableKafka
@EnableRetry
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.retry.back-off-period}")
    private Integer backOffPeriod;

    @Value("${spring.kafka.consumer.retry.max-attempts}")
    private Integer maxAttempts;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kafka cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // allow a pool of processes to divide the work of consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        // automatically reset the offset to the earliest offset
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }
 }

 

4. Producer 셋팅 

@Configuration
public class KafkaProducerConfig {

	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServers;

	@Bean
	public Map<String, Object> producerConfigs() {
		Map<String, Object> props = new HashMap<>();

		// list of host:port pairs used for establishing the initial connections to the Kafka cluster
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

		return props;
	}

	@Bean
	public ProducerFactory<String, String> producerFactory() {
		return new DefaultKafkaProducerFactory<>(producerConfigs());
	}

	@Bean
	public KafkaTemplate<String, String> kafkaTemplate() {
		return new KafkaTemplate<>(producerFactory());
	}

}

 

5. 리스너 (메시지 받는부분) 셋팅 

 

Listener.java

public static final String TOPIC_TEST = "topic_test";
	/**
	 *
	 * @param message
	 */
	@KafkaListener(topics = TOPIC_TEST)
	public void testListener(String message) {
		
	}

 

 

6. 토픽 확인.

./kafka-topics.sh --list --bootstrap-server localhost:9092

 

 

bash (터미널) 에 토픽 리스트를 검색하게되면 서버 기동시 topics 들이 자동으로 등록된다.

 

다음엔 JDBC 를통해 카프카에 메시지를 적재하고 서비스 까지 전달하는 것을 진행해봐야겠다.

반응형

댓글