diff options
19 files changed, 179 insertions, 313 deletions
diff --git a/cps-application/src/main/java/org/onap/cps/config/WebSecurityConfig.java b/cps-application/src/main/java/org/onap/cps/config/WebSecurityConfig.java index 93a3a6ed2b..aedc6a8d66 100644 --- a/cps-application/src/main/java/org/onap/cps/config/WebSecurityConfig.java +++ b/cps-application/src/main/java/org/onap/cps/config/WebSecurityConfig.java @@ -2,6 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (c) 2021 Bell Canada. * Modification Copyright (C) 2021 Pantheon.tech + * Modification Copyright (C) 2023 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,11 +23,14 @@ package org.onap.cps.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder; import org.springframework.security.config.annotation.web.builders.HttpSecurity; import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; -import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter; +import org.springframework.security.core.userdetails.User; +import org.springframework.security.core.userdetails.UserDetails; +import org.springframework.security.provisioning.InMemoryUserDetailsManager; +import org.springframework.security.web.SecurityFilterChain; /** * Configuration class to implement application security. @@ -34,7 +38,7 @@ import org.springframework.security.config.annotation.web.configuration.WebSecur */ @Configuration @EnableWebSecurity -public class WebSecurityConfig extends WebSecurityConfigurerAdapter { +public class WebSecurityConfig { private static final String USER_ROLE = "USER"; @@ -60,23 +64,43 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter { this.password = password; } - @Override + /** + * Return the configuration for secure access to the modules REST end points. + * + * @param http the HTTP security settings. + * @return the HTTP security settings. + */ + @Bean // The team decided to disable default CSRF Spring protection and not implement CSRF tokens validation. // CPS is a stateless REST API that is not as vulnerable to CSRF attacks as web applications running in // web browsers are. CPS does not manage sessions, each request requires the authentication token in the header. // See https://docs.spring.io/spring-security/site/docs/5.3.8.RELEASE/reference/html5/#csrf @SuppressWarnings("squid:S4502") - protected void configure(final HttpSecurity http) throws Exception { + public SecurityFilterChain filterChain(final HttpSecurity http) throws Exception { http - .csrf().disable() - .authorizeRequests() - .antMatchers(permitUris).permitAll() - .anyRequest().authenticated() - .and().httpBasic(); + .httpBasic() + .and() + .authorizeRequests() + .antMatchers(permitUris).permitAll() + .anyRequest().authenticated() + .and() + .csrf().disable(); + + return http.build(); } - @Override - protected void configure(final AuthenticationManagerBuilder auth) throws Exception { - auth.inMemoryAuthentication().withUser(username).password("{noop}" + password).roles(USER_ROLE); + /** + * In memory user authentication details. + * + * @return in memory authetication + */ + @Bean + public InMemoryUserDetailsManager userDetailsService() { + final UserDetails user = User.builder() + .username(username) + .password("{noop}" + password) + .roles(USER_ROLE) + .build(); + return new InMemoryUserDetailsManager(user); } } diff --git a/cps-application/src/test/groovy/org/onap/cps/rest/controller/ControllerSecuritySpec.groovy b/cps-application/src/test/groovy/org/onap/cps/rest/controller/ControllerSecuritySpec.groovy index 5c255f1dac..ccadc57240 100755 --- a/cps-application/src/test/groovy/org/onap/cps/rest/controller/ControllerSecuritySpec.groovy +++ b/cps-application/src/test/groovy/org/onap/cps/rest/controller/ControllerSecuritySpec.groovy @@ -20,6 +20,9 @@ package org.onap.cps.rest.controller +import org.onap.cps.config.WebSecurityConfig +import org.springframework.context.annotation.Import + import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get import org.springframework.beans.factory.annotation.Autowired @@ -29,6 +32,7 @@ import org.springframework.test.web.servlet.MockMvc import spock.lang.Specification @WebMvcTest(TestController) +@Import(WebSecurityConfig) class ControllerSecuritySpec extends Specification { @Autowired diff --git a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json deleted file mode 100644 index bbcadcd0f4..0000000000 --- a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-headers-1.0.0.json +++ /dev/null @@ -1,55 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events.async:batch-event-headers:1.0.0", - "$ref": "#/definitions/BatchEventHeaders", - "definitions": { - "BatchEventHeaders": { - "description": "The header information of the Batch event.", - "type": "object", - "javaType" : "org.onap.cps.ncmp.events.async.BatchEventHeadersV1", - "properties": { - "eventId": { - "description": "The unique id for identifying the event.", - "type": "string" - }, - "eventCorrelationId": { - "description": "The request id received by NCMP as an acknowledgement.", - "type": "string" - }, - "eventTime": { - "description": "The time of the event. It should be in RFC format ('yyyy-MM-dd'T'HH:mm:ss.SSSZ').", - "type": "string" - }, - "eventTarget": { - "description": "The destination topic to forward the consumed event.", - "type": "string" - }, - "eventSource": { - "description": "The source of the event.", - "type": "string" - }, - "eventType": { - "description": "The type of the Batch event.", - "type": "string" - }, - "eventSchema": { - "description": "The schema of the Batch event payload.", - "type": "string" - }, - "eventSchemaVersion": { - "description": "The schema version of the Batch event payload.", - "type": "string" - } - }, - "required": [ - "eventId", - "eventCorrelationId", - "eventTarget", - "eventType", - "eventSchema", - "eventSchemaVersion" - ], - "additionalProperties": false - } - } -}
\ No newline at end of file diff --git a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json index da836ff167..308e3068d6 100644 --- a/cps-ncmp-events/src/main/resources/schemas/async/batch-event-schema-1.0.0.json +++ b/cps-ncmp-events/src/main/resources/schemas/async/data-operation-event-schema-1.0.0.json @@ -1,19 +1,18 @@ { "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events.async:batch-event-schema:1.0.0", - "$ref": "#/definitions/BatchDataResponseEvent", + "$id": "urn:cps:org.onap.cps.ncmp.events.async:data-operation-event-schema:1.0.0", + "$ref": "#/definitions/DataOperationEvent", "definitions": { - "BatchDataResponseEvent": { - "description": "The payload of batch event.", + "DataOperationEvent": { + "description": "The payload of data operation event.", "type": "object", - "javaType" : "org.onap.cps.ncmp.events.async.BatchDataResponseEventV1", + "javaType" : "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent", "properties": { - "event": { + "data": { "description": "The payload content of the requested data.", "type": "object", - "javaType" : "org.onap.cps.ncmp.events.async.BatchDataEvent", "properties": { - "batch-responses": { + "responses": { "description": "An array of batch responses which contains both success and failure", "type": "array", "items": { @@ -27,15 +26,15 @@ "description": "Id's of the cmhandles", "type": "array" }, - "status-code": { + "statusCode": { "description": "which says success or failure (0-99) are for success and (100-199) are for failure", "type": "string" }, - "status-message": { + "statusMessage": { "description": "Human readable message, Which says what the response has", "type": "string" }, - "data": { + "responseContent": { "description": "Contains the requested data response.", "type": "object", "existingJavaType": "java.lang.Object", @@ -45,21 +44,21 @@ "required": [ "operationId", "ids", - "status-code", - "status-message" + "statusCode", + "statusMessage" ], "additionalProperties": false } } }, "required": [ - "batch-responses" + "responses" ], "additionalProperties": false } }, "required": [ - "event" + "data" ], "additionalProperties": false } diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json deleted file mode 100644 index ea1e617c82..0000000000 --- a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-header-schema:v1", - "$ref": "#/definitions/AvcEventHeader", - "definitions": { - "AvcEventHeader": { - "description": "The header for AVC event.", - "type": "object", - "javaType" : "org.onap.cps.ncmp.events.avc.v1.AvcEventHeader", - "properties": { - "eventId": { - "description": "The unique id identifying the event generated by DMI for this AVC event.", - "type": "string" - }, - "eventCorrelationId": { - "description": "The request id passed by NCMP for this AVC event.", - "type": "string" - }, - "eventTime": { - "description": "The time of the AVC event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.", - "type": "string" - }, - "eventSource": { - "description": "The source of the AVC event.", - "type": "string" - }, - "eventType": { - "description": "The type of the AVC event.", - "type": "string" - }, - "eventSchema": { - "description": "The event schema for AVC events.", - "type": "string" - }, - "eventSchemaVersion": { - "description": "The event schema version for AVC events.", - "type": "string" - } - }, - "required": [ - "eventId", - "eventCorrelationId", - "eventType", - "eventSchema", - "eventSchemaVersion" - ], - "additionalProperties": false - } - } -}
\ No newline at end of file diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-1.0.0.json index 7e975c9b93..a5bed939bf 100644 --- a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json +++ b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-1.0.0.json @@ -1,10 +1,9 @@ { "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:v1", + "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:1.0.0", "$ref": "#/definitions/AvcEvent", "definitions": { "Edit": { - "javaType": "org.onap.cps.ncmp.events.avc.v1.Edit", "additionalProperties": false, "properties": { "edit-id": { @@ -48,9 +47,9 @@ "AvcEvent": { "description": "The payload for AVC event.", "type": "object", - "javaType": "org.onap.cps.ncmp.events.avc.v1.AvcEvent", + "javaType": "org.onap.cps.ncmp.events.avc1_0_0.AvcEvent", "properties": { - "event": { + "data": { "description": "The AVC event content compliant with RFC8641 format", "type": "object", "additionalProperties": false, @@ -99,7 +98,7 @@ } }, "required": [ - "event" + "data" ], "additionalProperties": false } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java index b343d70a7a..9e2b66a2c1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/BatchRecordFilterStrategy.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java @@ -22,17 +22,17 @@ package org.onap.cps.ncmp.api.impl.async; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.common.header.Header; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.listener.adapter.RecordFilterStrategy; /** - * Batch Record filter strategy, which helps to filter the consumer records. + * Data operation record filter strategy, which helps to filter the consumer records. * */ @Configuration -public class BatchRecordFilterStrategy { +public class DataOperationRecordFilterStrategy { /** * Filtering the consumer records based on the eventType header, It @@ -41,7 +41,7 @@ public class BatchRecordFilterStrategy { * @return boolean value. */ @Bean - public RecordFilterStrategy<String, BatchDataResponseEventV1> filterBatchDataResponseEvent() { + public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() { return consumedRecord -> { final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType"); if (eventTypeHeader == null) { @@ -49,7 +49,7 @@ public class BatchRecordFilterStrategy { } final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value()); return !(eventTypeHeaderValue != null - && eventTypeHeaderValue.startsWith("org.onap.cps.ncmp.events.async.BatchDataResponseEvent")); + && eventTypeHeaderValue.contains("DataOperationEvent")); }; } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java index 2a332d0037..995a4d5a67 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java @@ -25,40 +25,41 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.SerializationUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1; +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** - * Listener for cps-ncmp async batch events. + * Listener for cps-ncmp async data operation events. */ @Component @Slf4j @RequiredArgsConstructor @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class NcmpAsyncBatchEventConsumer { +public class NcmpAsyncDataOperationEventConsumer { - private final EventsPublisher<BatchDataResponseEventV1> eventsPublisher; + private final EventsPublisher<DataOperationEvent> eventsPublisher; /** - * Consume the BatchDataResponseEvent published by producer to topic 'async-m2m.topic' + * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic' * and publish the same to the client specified topic. * - * @param batchEventConsumerRecord consuming event as a ConsumerRecord. + * @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord. */ @KafkaListener( topics = "${app.ncmp.async-m2m.topic}", - filter = "filterBatchDataResponseEvent", - groupId = "ncmp-batch-event-group", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async.BatchDataResponseEventV1"}) - public void consumeAndPublish(final ConsumerRecord<String, BatchDataResponseEventV1> batchEventConsumerRecord) { - log.info("Consuming event payload {} ...", batchEventConsumerRecord.value()); + filter = "includeDataOperationEventsOnly", + groupId = "ncmp-data-operation-event-group", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"}) + public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent> + dataOperationEventConsumerRecord) { + log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value()); final String eventTarget = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventTarget").value()); + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value()); final String eventId = SerializationUtils - .deserialize(batchEventConsumerRecord.headers().lastHeader("eventId").value()); - eventsPublisher.publishEvent(eventTarget, eventId, batchEventConsumerRecord.headers(), - batchEventConsumerRecord.value()); + .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value()); + eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(), + dataOperationEventConsumerRecord.value()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 7b28b4cd5f..e61e7729be 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -51,6 +51,19 @@ public class EventsPublisher<T> { private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate; /** + * Generic CloudEvent publisher. + * + * @param topicName valid topic name + * @param eventKey message key + * @param event message payload + */ + public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { + final ListenableFuture<SendResult<String, CloudEvent>> eventFuture + = cloudEventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); + } + + /** * Generic Event publisher. * * @param topicName valid topic name @@ -95,7 +108,7 @@ public class EventsPublisher<T> { publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } - private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) { + private ListenableFutureCallback<SendResult<String, ?>> handleCallback(final String topicName) { return new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { @@ -103,7 +116,7 @@ public class EventsPublisher<T> { } @Override - public void onSuccess(final SendResult<String, T> sendResult) { + public void onSuccess(final SendResult<String, ?> sendResult) { log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index f37497abe6..b5ca176d1d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -20,19 +20,17 @@ package org.onap.cps.ncmp.api.impl.events.avc; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeader; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.avc.v1.AvcEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; -import org.springframework.util.SerializationUtils; /** * Listener for AVC events. @@ -47,34 +45,19 @@ public class AvcEventConsumer { @Value("${app.ncmp.avc.cm-events-topic}") private String cmEventsTopicName; - private final EventsPublisher<AvcEvent> eventsPublisher; - private final AvcEventMapper avcEventMapper; - + private final EventsPublisher<CloudEvent> eventsPublisher; /** * Incoming AvcEvent in the form of Consumer Record. * * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener(topics = "${app.dmi.cm-events.topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"}) - public void consumeAndForward(final ConsumerRecord<String, AvcEvent> avcEventConsumerRecord) { + @KafkaListener(topics = "${app.dmi.cm-events.topic}") + public void consumeAndForward(final ConsumerRecord<String, CloudEvent> avcEventConsumerRecord) { log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); - final String mutatedEventId = UUID.randomUUID().toString(); - mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId); - final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value()); - eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(), - outgoingAvcEvent); - } - - private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { - final String eventId = "eventId"; - final String existingEventId = - (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value()); - eventHeaders.remove(eventId); - log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, - mutatedEventId); - eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId))); - + final String newEventId = UUID.randomUUID().toString(); + final CloudEvent outgoingAvcEvent = + CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build(); + eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java deleted file mode 100644 index 8246ed4802..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avc; - -import org.mapstruct.Mapper; -import org.onap.cps.ncmp.events.avc.v1.AvcEvent; - - -/** - * Mapper for converting incoming {@link AvcEvent} to outgoing {@link AvcEvent}. - */ -@Mapper(componentModel = "spring") -public interface AvcEventMapper { - - AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent); - -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy index 02071cd8cf..d9b9ce6db0 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy @@ -28,7 +28,7 @@ import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1 +import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean @@ -37,43 +37,41 @@ import org.springframework.boot.test.context.SpringBootTest import org.springframework.kafka.listener.adapter.RecordFilterStrategy import org.springframework.test.annotation.DirtiesContext import org.testcontainers.spock.Testcontainers - import java.time.Duration -@SpringBootTest(classes = [EventsPublisher, NcmpAsyncBatchEventConsumer, BatchRecordFilterStrategy,JsonObjectMapper, - ObjectMapper]) +@SpringBootTest(classes = [EventsPublisher, NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy,JsonObjectMapper, ObjectMapper]) @Testcontainers @DirtiesContext -class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { +class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher) + NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher) @Autowired JsonObjectMapper jsonObjectMapper @Autowired - RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy + RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer)) def static clientTopic = 'client-topic' - def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1' + def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent' def 'Consume and publish event to client specified topic'() { given: 'consumer subscribing to client topic' legacyEventKafkaConsumer.subscribe([clientTopic]) - and: 'consumer record for batch event' - def consumerRecordIn = createConsumerRecord(batchEventType) - when: 'the batch event is consumed and published to client specified topic' - asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn) + and: 'consumer record for data operation event' + def consumerRecordIn = createConsumerRecord(dataOperationType) + when: 'the data operation event is consumed and published to client specified topic' + asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn) and: 'the client specified topic is polled' def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0] then: 'verifying consumed event operationID is same as published event operationID' - def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId - def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId + def operationIdIn = consumerRecordIn.value.data.responses[0].operationId + def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId assert operationIdIn == operationIdOut } @@ -85,14 +83,14 @@ class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec { then: 'the event is #description' assert result == expectedResult where: 'filter the event based on the eventType #eventType' - description | eventType || expectedResult - 'not filtered(the consumer will see the event)' | batchEventType || false - 'filtered(the consumer will not see the event)' | 'wrongType' || true + description | eventType || expectedResult + 'not filtered(the consumer will see the event)' | dataOperationType || false + 'filtered(the consumer will not see the event)' | 'wrongType' || true } def createConsumerRecord(eventTypeAsString) { - def jsonData = TestUtils.getResourceFileContent('batchDataEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, BatchDataResponseEventV1.class) + def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json') + def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class) def eventTarget = SerializationUtils.serialize(clientTopic) def eventType = SerializationUtils.serialize(eventTypeAsString) def eventId = SerializationUtils.serialize('12345') diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy index 3dffac714b..4a9e3ee811 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy @@ -21,21 +21,23 @@ package org.onap.cps.ncmp.api.impl.events.avc import com.fasterxml.jackson.databind.ObjectMapper +import io.cloudevents.CloudEvent +import io.cloudevents.core.CloudEventUtils +import io.cloudevents.core.builder.CloudEventBuilder +import io.cloudevents.jackson.PojoCloudEventDataMapper +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.StringDeserializer -import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.events.avc.v1.AvcEvent +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext -import org.springframework.util.SerializationUtils import org.testcontainers.spock.Testcontainers import java.time.Duration @@ -46,52 +48,49 @@ import java.time.Duration class AvcEventConsumerSpec extends MessagingBaseSpec { @SpringBean - AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class) + EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate) - - @SpringBean - AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper) + AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher) @Autowired JsonObjectMapper jsonObjectMapper - def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) + @Autowired + ObjectMapper objectMapper + + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer)) def 'Consume and forward valid message'() { given: 'consumer has a subscription on a topic' def cmEventsTopicName = 'cm-events' acvEventConsumer.cmEventsTopicName = cmEventsTopicName - legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>) + cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>) and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) + def testCloudEventSent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(testEventSent)) + .withId('sample-eventid') + .withType('sample-test-type') + .withSource(URI.create('sample-test-source')) + .withExtension('correlationid', 'test-cmhandle1').build() and: 'event has header information' - def consumerRecord = new ConsumerRecord<String,AvcEvent>(cmEventsTopicName,0, 0, 'sample-eventid', testEventSent) - consumerRecord.headers().add(new RecordHeader('eventId', SerializationUtils.serialize('sample-eventid'))) - consumerRecord.headers().add(new RecordHeader('eventCorrelationId', SerializationUtils.serialize('cmhandle1'))) + def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent) when: 'the event is consumed' acvEventConsumer.consumeAndForward(consumerRecord) and: 'the topic is polled' - def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500)) + def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'record can be converted to AVC event' def record = records.iterator().next() - def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent.class) + def cloudevent = record.value() as CloudEvent + def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue() and: 'we have correct headers forwarded where correlation id matches' - record.headers().forEach(header -> { - if (header.key().equals('eventCorrelationId')) { - assert SerializationUtils.deserialize(header.value()) == 'cmhandle1' - } - }) + assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1' and: 'event id differs(as per requirement) between consumed and forwarded' - record.headers().forEach(header -> { - if (header.key().equals('eventId')) { - assert SerializationUtils.deserialize(header.value()) != 'sample-eventid' - } - }) + assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid' and: 'the event payload still matches' assert testEventSent == convertedAvcEvent } diff --git a/cps-ncmp-service/src/test/resources/batchDataEvent.json b/cps-ncmp-service/src/test/resources/dataOperationEvent.json index 49eb273f58..42268c0ef3 100644 --- a/cps-ncmp-service/src/test/resources/batchDataEvent.json +++ b/cps-ncmp-service/src/test/resources/dataOperationEvent.json @@ -1,15 +1,15 @@ { - "event":{ - "batch-responses":[ + "data":{ + "responses":[ { "operationId":"1", "ids":[ "123", "124" ], - "status-code":1, - "status-message":"Batch operation success on the above cmhandle ids ", - "data":{ + "statusCode":1, + "statusMessage":"Batch operation success on the above cmhandle ids ", + "responseContent":{ "ietf-netconf-monitoring:netconf-state":{ "schemas":{ "schema":[ @@ -26,20 +26,6 @@ } } } - }, - { - "operationId":"101", - "ids":[ - "456", - "457" - ], - "status-code":101, - "status-message":"cmHandle(s) do not exist", - "data":{ - "error":{ - "message":"cmHandle(s) do not exist" - } - } } ] } diff --git a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json index 569343fed9..5b297c86c2 100644 --- a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json +++ b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json @@ -1,5 +1,5 @@ { - "event":{ + "data":{ "push-change-update":{ "datastore-changes":{ "ietf-yang-patch:yang-patch":{ diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/GetPerfTest.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/GetPerfTest.groovy index d20da46cc5..eee87dd7c0 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/GetPerfTest.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/GetPerfTest.groovy @@ -44,9 +44,9 @@ class GetPerfTest extends CpsPerfTestBase { recordAndAssertPerformance("Read datatrees with ${scenario}", durationLimit, durationInMillis) where: 'the following parameters are used' scenario | fetchDescendantsOption | anchor || durationLimit | expectedNumberOfDataNodes - 'no descendants' | OMIT_DESCENDANTS | 'openroadm1' || 100 | 1 - 'direct descendants' | DIRECT_CHILDREN_ONLY | 'openroadm2' || 150 | 1 + 50 - 'all descendants' | INCLUDE_ALL_DESCENDANTS | 'openroadm3' || 600 | 1 + 50 * 86 + 'no descendants' | OMIT_DESCENDANTS | 'openroadm1' || 50 | 1 + 'direct descendants' | DIRECT_CHILDREN_ONLY | 'openroadm2' || 100 | 1 + 50 + 'all descendants' | INCLUDE_ALL_DESCENDANTS | 'openroadm3' || 200 | 1 + 50 * 86 } def 'Read data trees for multiple xpaths'() { @@ -59,7 +59,7 @@ class GetPerfTest extends CpsPerfTestBase { assert countDataNodesInTree(result) == 50 * 86 def durationInMillis = stopWatch.getTotalTimeMillis() then: 'all data is read within 500 ms' - recordAndAssertPerformance("Read datatrees for multiple xpaths", 500, durationInMillis) + recordAndAssertPerformance("Read datatrees for multiple xpaths", 200, durationInMillis) } def 'Read complete data trees using #scenario.'() { @@ -75,10 +75,10 @@ class GetPerfTest extends CpsPerfTestBase { recordAndAssertPerformance("Read datatrees using ${scenario}", durationLimit, durationInMillis) where: 'the following xpaths are used' scenario | anchorPrefix | xpath || durationLimit | expectedNumberOfDataNodes - 'bookstore root' | 'bookstore' | '/' || 300 | 78 - 'bookstore top element' | 'bookstore' | '/bookstore' || 300 | 78 - 'openroadm root' | 'openroadm' | '/' || 1200 | 1 + 50 * 86 - 'openroadm top element' | 'openroadm' | '/openroadm-devices' || 1200 | 1 + 50 * 86 + 'bookstore root' | 'bookstore' | '/' || 200 | 78 + 'bookstore top element' | 'bookstore' | '/bookstore' || 200 | 78 + 'openroadm root' | 'openroadm' | '/' || 600 | 1 + 50 * 86 + 'openroadm top element' | 'openroadm' | '/openroadm-devices' || 600 | 1 + 50 * 86 } } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/QueryPerfTest.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/QueryPerfTest.groovy index 885f1c2038..eafd16f346 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/QueryPerfTest.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/QueryPerfTest.groovy @@ -45,10 +45,10 @@ class QueryPerfTest extends CpsPerfTestBase { recordAndAssertPerformance("Query 1 anchor ${scenario}", durationLimit, durationInMillis) where: 'the following parameters are used' scenario | anchor | cpsPath || durationLimit | expectedNumberOfDataNodes - 'top element' | 'openroadm1' | '/openroadm-devices' || 500 | 50 * 86 + 1 - 'leaf condition' | 'openroadm2' | '//openroadm-device[@ne-state="inservice"]' || 500 | 50 * 86 - 'ancestors' | 'openroadm3' | '//openroadm-device/ancestor::openroadm-devices' || 500 | 50 * 86 + 1 - 'leaf condition + ancestors' | 'openroadm4' | '//openroadm-device[@status="success"]/ancestor::openroadm-devices' || 500 | 50 * 86 + 1 + 'top element' | 'openroadm1' | '/openroadm-devices' || 200 | 50 * 86 + 1 + 'leaf condition' | 'openroadm2' | '//openroadm-device[@ne-state="inservice"]' || 250 | 50 * 86 + 'ancestors' | 'openroadm3' | '//openroadm-device/ancestor::openroadm-devices' || 200 | 50 * 86 + 1 + 'leaf condition + ancestors' | 'openroadm4' | '//openroadm-device[@status="success"]/ancestor::openroadm-devices' || 200 | 50 * 86 + 1 } def 'Query complete data trees across all anchors with #scenario.'() { @@ -63,10 +63,10 @@ class QueryPerfTest extends CpsPerfTestBase { recordAndAssertPerformance("Query across anchors ${scenario}", durationLimit, durationInMillis) where: 'the following parameters are used' scenario | cpspath || durationLimit | expectedNumberOfDataNodes - 'top element' | '/openroadm-devices' || 2000 | 5 * (50 * 86 + 1) - 'leaf condition' | '//openroadm-device[@ne-state="inservice"]' || 2000 | 5 * (50 * 86) - 'ancestors' | '//openroadm-device/ancestor::openroadm-devices' || 2000 | 5 * (50 * 86 + 1) - 'leaf condition + ancestors' | '//openroadm-device[@status="success"]/ancestor::openroadm-devices' || 2000 | 5 * (50 * 86 + 1) + 'top element' | '/openroadm-devices' || 600 | 5 * (50 * 86 + 1) + 'leaf condition' | '//openroadm-device[@ne-state="inservice"]' || 1000 | 5 * (50 * 86) + 'ancestors' | '//openroadm-device/ancestor::openroadm-devices' || 600 | 5 * (50 * 86 + 1) + 'leaf condition + ancestors' | '//openroadm-device[@status="success"]/ancestor::openroadm-devices' || 600 | 5 * (50 * 86 + 1) } def 'Query with leaf condition and #scenario.'() { @@ -81,9 +81,9 @@ class QueryPerfTest extends CpsPerfTestBase { recordAndAssertPerformance("Query with ${scenario}", durationLimit, durationInMillis) where: 'the following parameters are used' scenario | fetchDescendantsOption | anchor || durationLimit | expectedNumberOfDataNodes - 'no descendants' | OMIT_DESCENDANTS | 'openroadm1' || 100 | 50 - 'direct descendants' | DIRECT_CHILDREN_ONLY | 'openroadm2' || 200 | 50 * 2 - 'all descendants' | INCLUDE_ALL_DESCENDANTS | 'openroadm3' || 500 | 50 * 86 + 'no descendants' | OMIT_DESCENDANTS | 'openroadm1' || 60 | 50 + 'direct descendants' | DIRECT_CHILDREN_ONLY | 'openroadm2' || 120 | 50 * 2 + 'all descendants' | INCLUDE_ALL_DESCENDANTS | 'openroadm3' || 200 | 50 * 86 } def 'Query ancestors with #scenario.'() { @@ -98,9 +98,9 @@ class QueryPerfTest extends CpsPerfTestBase { recordAndAssertPerformance("Query ancestors with ${scenario}", durationLimit, durationInMillis) where: 'the following parameters are used' scenario | fetchDescendantsOption | anchor || durationLimit | expectedNumberOfDataNodes - 'no descendants' | OMIT_DESCENDANTS | 'openroadm1' || 100 | 1 - 'direct descendants' | DIRECT_CHILDREN_ONLY | 'openroadm2' || 200 | 1 + 50 - 'all descendants' | INCLUDE_ALL_DESCENDANTS | 'openroadm3' || 500 | 1 + 50 * 86 + 'no descendants' | OMIT_DESCENDANTS | 'openroadm1' || 60 | 1 + 'direct descendants' | DIRECT_CHILDREN_ONLY | 'openroadm2' || 120 | 1 + 50 + 'all descendants' | INCLUDE_ALL_DESCENDANTS | 'openroadm3' || 200 | 1 + 50 * 86 } } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/UpdatePerfTest.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/UpdatePerfTest.groovy index c281908653..a02d21c41a 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/UpdatePerfTest.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/performance/cps/UpdatePerfTest.groovy @@ -40,7 +40,7 @@ class UpdatePerfTest extends CpsPerfTestBase { stopWatch.stop() def updateDurationInMillis = stopWatch.getTotalTimeMillis() then: 'update duration is under 1000 milliseconds' - recordAndAssertPerformance('Update 1 data node', 1000, updateDurationInMillis) + recordAndAssertPerformance('Update 1 data node', 600, updateDurationInMillis) } def 'Batch update 10 data nodes with descendants'() { @@ -56,7 +56,7 @@ class UpdatePerfTest extends CpsPerfTestBase { stopWatch.stop() def updateDurationInMillis = stopWatch.getTotalTimeMillis() then: 'update duration is under 5000 milliseconds' - recordAndAssertPerformance('Update 10 data nodes', 5000, updateDurationInMillis) + recordAndAssertPerformance('Update 10 data nodes', 4000, updateDurationInMillis) } } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/CmHandleQueryPerfTest.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/CmHandleQueryPerfTest.groovy index 5d7c9de51e..bcb2d2fe5d 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/CmHandleQueryPerfTest.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/CmHandleQueryPerfTest.groovy @@ -44,7 +44,7 @@ class CmHandleQueryPerfTest extends NcmpRegistryPerfTestBase { stopWatch.stop() def durationInMillis = stopWatch.getTotalTimeMillis() then: 'the required operations are performed within 1200 ms' - recordAndAssertPerformance("CpsPath Registry attributes Query", 1200, durationInMillis) + recordAndAssertPerformance("CpsPath Registry attributes Query", 500, durationInMillis) and: 'all but 1 (other node) are returned' result.size() == 999 and: 'the tree contains all the expected descendants too' |