diff options
Diffstat (limited to 'cps-ncmp-service')
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java) | 55 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java | 3 | ||||
-rw-r--r-- | cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy | 4 | ||||
-rw-r--r-- | cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy) | 4 |
4 files changed, 48 insertions, 18 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java index b76f86ebeb..514967574f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java @@ -29,6 +29,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -45,7 +46,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer; @Configuration @EnableKafka @RequiredArgsConstructor -public class KafkaTemplateConfig<T> { +public class KafkaConfig<T> { private final KafkaProperties kafkaProperties; @@ -76,6 +77,32 @@ public class KafkaTemplateConfig<T> { } /** + * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this. + * + * @return an instance of legacy Kafka template. + */ + @Bean + @Primary + public KafkaTemplate<String, T> legacyEventKafkaTemplate() { + final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); + kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory()); + return kafkaTemplate; + } + + /** + * A legacy concurrent kafka listener container factory. + * + * @return instance of Concurrent kafka listener factory + */ + @Bean + public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() { + final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory = + new ConcurrentKafkaListenerContainerFactory<>(); + containerFactory.setConsumerFactory(legacyEventConsumerFactory()); + return containerFactory; + } + + /** * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into * application.yml with CloudEventSerializer. * @@ -99,18 +126,6 @@ public class KafkaTemplateConfig<T> { return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); } - /** - * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this. - * - * @return an instance of legacy Kafka template. - */ - @Bean - @Primary - public KafkaTemplate<String, T> legacyEventKafkaTemplate() { - final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); - kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory()); - return kafkaTemplate; - } /** * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this. @@ -124,4 +139,18 @@ public class KafkaTemplateConfig<T> { return kafkaTemplate; } + /** + * A Concurrent CloudEvent kafka listener container factory. + * + * @return instance of Concurrent kafka listener factory + */ + @Bean + public ConcurrentKafkaListenerContainerFactory<String, CloudEvent> + cloudEventConcurrentKafkaListenerContainerFactory() { + final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory = + new ConcurrentKafkaListenerContainerFactory<>(); + containerFactory.setConsumerFactory(cloudEventConsumerFactory()); + return containerFactory; + } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index b5ca176d1d..88ebd35c88 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -52,7 +52,8 @@ public class AvcEventConsumer { * * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener(topics = "${app.dmi.cm-events.topic}") + @KafkaListener(topics = "${app.dmi.cm-events.topic}", + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) { log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); final String newEventId = UUID.randomUUID().toString(); diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy index c0bdf3d1d1..f577f55ba2 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy @@ -69,7 +69,7 @@ class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSp KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer)) producer.send(record) and: 'wait a little for async processing of message' - TimeUnit.MILLISECONDS.sleep(100) + TimeUnit.MILLISECONDS.sleep(300) then: 'the event has only been forwarded for the correct type' expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(*_) where: 'the following event types are used' @@ -85,7 +85,7 @@ class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSp KafkaProducer<String, String> producer = new KafkaProducer<>(eventProducerConfigProperties(StringSerializer)) producer.send(record) and: 'wait a little for async processing of message' - TimeUnit.MILLISECONDS.sleep(100) + TimeUnit.MILLISECONDS.sleep(300) then: 'the event is not processed by this consumer' 0 * mockEventsPublisher.publishCloudEvent(*_) } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy index ed5f161258..d5b0915526 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy @@ -34,10 +34,10 @@ import org.springframework.kafka.support.serializer.JsonSerializer import spock.lang.Shared import spock.lang.Specification -@SpringBootTest(classes = [KafkaProperties, KafkaTemplateConfig]) +@SpringBootTest(classes = [KafkaProperties, KafkaConfig]) @EnableSharedInjection @EnableConfigurationProperties -class KafkaTemplateConfigSpec extends Specification { +class KafkaConfigSpec extends Specification { @Shared @Autowired |