summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java9
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy72
2 files changed, 62 insertions, 19 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
index ce666b1099..76cc0c4b7b 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.impl.async;
import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.impl.KafkaHeaders;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -31,6 +32,7 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
*
*/
@Configuration
+@Slf4j
public class DataOperationRecordFilterStrategy {
/**
@@ -42,8 +44,11 @@ public class DataOperationRecordFilterStrategy {
@Bean
public RecordFilterStrategy<String, CloudEvent> includeDataOperationEventsOnly() {
return consumedRecord -> {
- final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(
- consumedRecord.headers(), "ce_type");
+ final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(consumedRecord.headers(), "ce_type");
+ if (eventTypeHeaderValue == null) {
+ log.trace("No ce_type header found, possibly a legacy event (ignored)");
+ return true;
+ }
return !(eventTypeHeaderValue.contains("DataOperationEvent"));
};
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
index 3db8520c29..c0bdf3d1d1 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
@@ -1,3 +1,23 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
package org.onap.cps.ncmp.api.impl.async
import io.cloudevents.CloudEvent
@@ -5,10 +25,12 @@ import io.cloudevents.core.builder.CloudEventBuilder
import io.cloudevents.kafka.CloudEventSerializer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.StringSerializer
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.config.KafkaListenerEndpointRegistry
@@ -23,39 +45,55 @@ import java.util.concurrent.TimeUnit
@EnableAutoConfiguration
class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSpec {
+ @SpringBean
+ EventsPublisher mockEventsPublisher = Mock()
+
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
- @SpringBean
- EventsPublisher mockEventsPublisher = Mock()
+ @Value('${app.ncmp.async-m2m.topic}')
+ def topic
- def activateListeners() {
- kafkaListenerEndpointRegistry.getListenerContainers().forEach(
- messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
- )
+ def setup() {
+ activateListeners()
}
- def 'Filtering Events.'() {
+ def 'Filtering Cloud Events on Type.'() {
given: 'a cloud event of type: #eventType'
- def cloudEvent = CloudEventBuilder.v1().withId("some-uuid")
+ def cloudEvent = CloudEventBuilder.v1().withId('some id')
.withType(eventType)
- .withSource(URI.create("sample-test-source"))
- .build();
- and: 'activate message listener container'
- activateListeners()
+ .withSource(URI.create('some-source'))
+ .build()
when: 'send the cloud event'
- ProducerRecord<String, CloudEvent> record = new ProducerRecord<>('ncmp-async-m2m', cloudEvent)
+ ProducerRecord<String, CloudEvent> record = new ProducerRecord<>(topic, cloudEvent)
KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer))
- producer.send(record);
+ producer.send(record)
and: 'wait a little for async processing of message'
TimeUnit.MILLISECONDS.sleep(100)
then: 'the event has only been forwarded for the correct type'
- expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(_, _, _)
- where:
+ expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(*_)
+ where: 'the following event types are used'
eventType || expectedNUmberOfCallsToPublishForwardedEvent
'DataOperationEvent' || 1
'other type' || 0
'any type contain the word "DataOperationEvent"' || 1
}
-}
+ def 'Non cloud events on same Topic.'() {
+ when: 'sending a non-cloud event on the same topic'
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, 'simple string event')
+ KafkaProducer<String, String> producer = new KafkaProducer<>(eventProducerConfigProperties(StringSerializer))
+ producer.send(record)
+ and: 'wait a little for async processing of message'
+ TimeUnit.MILLISECONDS.sleep(100)
+ then: 'the event is not processed by this consumer'
+ 0 * mockEventsPublisher.publishCloudEvent(*_)
+ }
+
+ def activateListeners() {
+ kafkaListenerEndpointRegistry.getListenerContainers().forEach(
+ messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
+ )
+ }
+
+}