package kr.wisestone.owl.config; import kr.wisestone.owl.service.impl.AttachedFileServiceImpl; import kr.wisestone.owl.config.kafka.KafkaReceiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import java.util.HashMap; import java.util.Map; // edit by zenith : for disable kafka //@EnableKafka @Configuration public class KafkaConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(AttachedFileServiceImpl.class); @Value("${use.kafka}") private boolean useKafka; @Value("${kafka.bootstrap.servers}") private String bootstrapServers; @Value("${kafka.consumer.group.id}") private String groupId; @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(this.producerFactory()); } @Bean public ConsumerFactory consumerFactory() { Map configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(this.consumerFactory()); return factory; } }