aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java)55
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java3
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy4
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy (renamed from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy)4
4 files changed, 48 insertions, 18 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
index b76f86ebeb..514967574f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
@@ -29,6 +29,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -45,7 +46,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
@Configuration
@EnableKafka
@RequiredArgsConstructor
-public class KafkaTemplateConfig<T> {
+public class KafkaConfig<T> {
private final KafkaProperties kafkaProperties;
@@ -76,6 +77,32 @@ public class KafkaTemplateConfig<T> {
}
/**
+ * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
+ *
+ * @return an instance of legacy Kafka template.
+ */
+ @Bean
+ @Primary
+ public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
+ final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
+ kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
+ return kafkaTemplate;
+ }
+
+ /**
+ * A legacy concurrent kafka listener container factory.
+ *
+ * @return instance of Concurrent kafka listener factory
+ */
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
+ final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ containerFactory.setConsumerFactory(legacyEventConsumerFactory());
+ return containerFactory;
+ }
+
+ /**
* This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
* application.yml with CloudEventSerializer.
*
@@ -99,18 +126,6 @@ public class KafkaTemplateConfig<T> {
return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
}
- /**
- * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
- *
- * @return an instance of legacy Kafka template.
- */
- @Bean
- @Primary
- public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
- final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
- kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
- return kafkaTemplate;
- }
/**
* A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
@@ -124,4 +139,18 @@ public class KafkaTemplateConfig<T> {
return kafkaTemplate;
}
+ /**
+ * A Concurrent CloudEvent kafka listener container factory.
+ *
+ * @return instance of Concurrent kafka listener factory
+ */
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
+ cloudEventConcurrentKafkaListenerContainerFactory() {
+ final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ containerFactory.setConsumerFactory(cloudEventConsumerFactory());
+ return containerFactory;
+ }
+
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
index b5ca176d1d..88ebd35c88 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java
@@ -52,7 +52,8 @@ public class AvcEventConsumer {
*
* @param avcEventConsumerRecord Incoming raw consumer record
*/
- @KafkaListener(topics = "${app.dmi.cm-events.topic}")
+ @KafkaListener(topics = "${app.dmi.cm-events.topic}",
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) {
log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value());
final String newEventId = UUID.randomUUID().toString();
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
index c0bdf3d1d1..f577f55ba2 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
@@ -69,7 +69,7 @@ class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSp
KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer))
producer.send(record)
and: 'wait a little for async processing of message'
- TimeUnit.MILLISECONDS.sleep(100)
+ TimeUnit.MILLISECONDS.sleep(300)
then: 'the event has only been forwarded for the correct type'
expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(*_)
where: 'the following event types are used'
@@ -85,7 +85,7 @@ class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSp
KafkaProducer<String, String> producer = new KafkaProducer<>(eventProducerConfigProperties(StringSerializer))
producer.send(record)
and: 'wait a little for async processing of message'
- TimeUnit.MILLISECONDS.sleep(100)
+ TimeUnit.MILLISECONDS.sleep(300)
then: 'the event is not processed by this consumer'
0 * mockEventsPublisher.publishCloudEvent(*_)
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
index ed5f161258..d5b0915526 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
@@ -34,10 +34,10 @@ import org.springframework.kafka.support.serializer.JsonSerializer
import spock.lang.Shared
import spock.lang.Specification
-@SpringBootTest(classes = [KafkaProperties, KafkaTemplateConfig])
+@SpringBootTest(classes = [KafkaProperties, KafkaConfig])
@EnableSharedInjection
@EnableConfigurationProperties
-class KafkaTemplateConfigSpec extends Specification {
+class KafkaConfigSpec extends Specification {
@Shared
@Autowired