diff options
author | JosephKeenan <joseph.keenan@est.tech> | 2022-05-24 18:59:25 +0100 |
---|---|---|
committer | JosephKeenan <joseph.keenan@est.tech> | 2022-05-25 10:47:34 +0100 |
commit | f31c7f8bd4985c84f9126d071439c1a4de57f704 (patch) | |
tree | 3b5d91b6357705304ae95fe1ad01156afbded020 | |
parent | 4cf4962b74765a5afe234aa258a9143ea6936f73 (diff) |
Async request response NCMP -> Client
-Added consumer for DMI events and producer for forwarding to client
-Added schemas for events
-Updated tests
-Added new module for ncmp events
-Used mapstruct for event mapping
Issue-ID: CPS-830
Change-Id: I096d08af9d69092cf8651e11eaa00ce441fc3605
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
28 files changed, 1164 insertions, 297 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index 7ad22e977e..692996c985 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -70,6 +70,19 @@ spring: producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
client-id: cps-core
+ consumer:
+ group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group}
+ key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ properties:
+ spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
+ spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
+ spring.json.value.default.type: org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
+ spring.json.use.type.headers: false
+app:
+ ncmp:
+ async-m2m:
+ topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
notification:
data-updated:
@@ -85,7 +98,7 @@ notification: queue-capacity: 500
wait-for-tasks-to-complete-on-shutdown: true
thread-name-prefix: Async-
-
+ time-out-value-in-ms: 2000
springdoc:
swagger-ui:
diff --git a/cps-bom/pom.xml b/cps-bom/pom.xml index e46892695e..f2fcb6ee0e 100644 --- a/cps-bom/pom.xml +++ b/cps-bom/pom.xml @@ -2,7 +2,7 @@ <!-- ============LICENSE_START======================================================= Copyright (C) 2020 Pantheon.tech - Modifications Copyright (C) 2021 Nordix Foundation + Modifications Copyright (C) 2021 - 2022 Nordix Foundation Modifications Copyright (C) 2021 Bell Canada. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); @@ -101,6 +101,11 @@ </dependency> <dependency> <groupId>${project.groupId}</groupId> + <artifactId>cps-ncmp-events</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>checkstyle</artifactId> <version>${project.version}</version> </dependency> diff --git a/cps-ncmp-events/pom.xml b/cps-ncmp-events/pom.xml new file mode 100644 index 0000000000..2d49a4c011 --- /dev/null +++ b/cps-ncmp-events/pom.xml @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ============LICENSE_START======================================================= + Copyright (c) 2022 Nordix Foundation. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.onap.cps</groupId> + <artifactId>cps-parent</artifactId> + <version>3.1.0-SNAPSHOT</version> + <relativePath>../cps-parent/pom.xml</relativePath> + </parent> + + <artifactId>cps-ncmp-events</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <configuration> + <sourceDirectory>${basedir}/src/main/resources/schemas</sourceDirectory> + <targetPackage>org.onap.cps.ncmp.event.model</targetPackage> + <generateBuilders>true</generateBuilders> + <serializable>true</serializable> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/cps-ncmp-events/src/main/resources/schemas/dmi-async-request-response-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmi-async-request-response-event-schema-v1.json new file mode 100644 index 0000000000..528c063826 --- /dev/null +++ b/cps-ncmp-events/src/main/resources/schemas/dmi-async-request-response-event-schema-v1.json @@ -0,0 +1,87 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "urn:cps:org.onap.cps.ncmp.events:dmi-async-request-response-event-schema:v1", + "$ref": "#/definitions/DmiAsyncRequestResponseEvent", + "definitions": { + "DmiAsyncRequestResponseEvent": { + "description": "The payload for NCMP async request response event.", + "type": "object", + "properties": { + "eventId": { + "description": "The unique id identifying the event generated by DMI.", + "type": "string" + }, + "eventCorrelationId": { + "description": "The request id passed by NCMP.", + "type": "string" + }, + "eventTime": { + "description": "The time of the event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.", + "type": "string" + }, + "eventTarget": { + "description": "The target of the event.", + "type": "string" + }, + "eventType": { + "description": "The type of the event.", + "type": "string" + }, + "eventSchema": { + "description": "The event schema for async request response events.", + "type": "string" + }, + "eventSource": { + "description": "The source of the event.", + "type": "string" + }, + "eventContent": { + "$ref": "#/definitions/Event-Content" + } + }, + "required": [ + "eventId", + "eventCorrelationId", + "eventTime", + "eventTarget", + "eventType", + "eventSchema", + "eventSource", + "eventContent" + ] + }, + "Event-Content": { + "description": "The event content.", + "type": "object", + "properties": { + "response-data-schema": { + "description": "The schema of response data", + "type": "string" + }, + "response-status": { + "description": "The status of the response.", + "type": "string" + }, + "response-code": { + "description": "The code of the response.", + "type": "string" + }, + "response-data": { + "description": "The data payload", + "type": "object", + "properties": { + "payload": { + "type": "object" + } + } + }, + "required": [ + "response-data-schema", + "response-status", + "response-code", + "response-data" + ] + } + } + } +} diff --git a/cps-ncmp-events/src/main/resources/schemas/ncmp-async-request-response-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/ncmp-async-request-response-event-schema-v1.json new file mode 100644 index 0000000000..3fd15bd5d2 --- /dev/null +++ b/cps-ncmp-events/src/main/resources/schemas/ncmp-async-request-response-event-schema-v1.json @@ -0,0 +1,187 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "urn:cps:org.onap.cps.ncmp.events:ncmp-async-request-response-event-schema:v1", + "$ref": "#/definitions/NcmpAsyncRequestResponseEvent", + "definitions": { + "NcmpAsyncRequestResponseEvent": { + "description": "The payload for CPS async request response event.", + "type": "object", + "properties": { + "eventId": { + "description": "The unique id identifying the event generated by DMI.", + "type": "string" + }, + "eventCorrelationId": { + "description": "The request id passed by NCMP.", + "type": "string" + }, + "eventTime": { + "description": "The time of the event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.", + "type": "string" + }, + "eventTarget": { + "description": "The target of the event.", + "type": "string" + }, + "eventType": { + "description": "The type of the event.", + "type": "string" + }, + "eventSchema": { + "description": "The event schema for async request response events.", + "type": "string" + }, + "event": { + "$ref": "#/definitions/Event" + }, + "forwardedEvent": { + "$ref": "#/definitions/Forwarded-Event" + } + }, + "required": [ + "eventId", + "eventCorrelationId", + "eventTime", + "eventTarget", + "eventType", + "eventSchema" + ] + }, + "Forwarded-Event": { + "description": "The event content.", + "type": "object", + "properties": { + "eventId": { + "description": "The unique id identifying the event generated by DMI.", + "type": "string" + }, + "eventCorrelationId": { + "description": "The request id passed by NCMP.", + "type": "string" + }, + "eventTime": { + "description": "The time of the event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.", + "type": "string" + }, + "eventTarget": { + "description": "The target of the event.", + "type": "string" + }, + "eventType": { + "description": "The type of the event.", + "type": "string" + }, + "eventSchema": { + "description": "The event schema for async request response events.", + "type": "string" + }, + "eventSource": { + "description": "The source of the event.", + "type": "string" + }, + "response-data-schema": { + "description": "The received schema of response data", + "type": "string" + }, + "response-status": { + "description": "The received status of the response.", + "type": "string" + }, + "response-code": { + "description": "The received code of the response.", + "type": "string" + }, + "forwardedEventData": { + "description": "The data payload", + "type": "object", + "properties": { + "forwardedEventPayload": { + "type": "object" + } + } + }, + "required": [ + "eventId", + "eventCorrelationId", + "eventTime", + "eventTarget", + "eventType", + "eventSchema", + "eventSource", + "response-data-schema", + "response-status", + "response-code", + "forwardedEventData" + ] + } + }, + "Event": { + "description": "The event content.", + "type": "object", + "properties": { + "eventId": { + "description": "The unique id identifying the event generated by DMI", + "type": "string" + }, + "eventCorrelationId": { + "description": "The request id passed by NCMP.", + "type": "string" + }, + "eventTime": { + "description": "The time of the event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.", + "type": "string" + }, + "eventTarget": { + "description": "The target of the event.", + "type": "string" + }, + "eventType": { + "description": "The type of the event.", + "type": "string" + }, + "eventSchema": { + "description": "The event schema for async request response events.", + "type": "string" + }, + "eventSource": { + "description": "The source of the event.", + "type": "string" + }, + "response-data-schema": { + "description": "The received schema of response data", + "type": "string" + }, + "response-status": { + "description": "The received status of the response.", + "type": "string" + }, + "response-code": { + "description": "The received code of the response.", + "type": "string" + }, + "response-data": { + "description": "The data payload", + "type": "object", + "properties": { + "payload": { + "type": "object" + } + } + }, + "required": [ + "eventId", + "eventCorrelationId", + "eventTarget", + "eventTime", + "eventType", + "eventSchema", + "eventSource", + "response-data-schema", + "response-status", + "response-code", + "event-data" + ] + } + } + } +}
\ No newline at end of file diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java index cedc94672c..11517bcc9e 100755 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java @@ -46,6 +46,7 @@ import org.onap.cps.ncmp.api.impl.exception.InvalidTopicException; import org.onap.cps.ncmp.api.models.CmHandleQueryApiParameters; import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle; import org.onap.cps.ncmp.rest.api.NetworkCmProxyApi; +import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor; import org.onap.cps.ncmp.rest.mapper.RestOutputCmHandleStateMapper; import org.onap.cps.ncmp.rest.model.CmHandleProperties; import org.onap.cps.ncmp.rest.model.CmHandleProperty; @@ -61,6 +62,7 @@ import org.onap.cps.ncmp.rest.model.RestOutputCmHandle; import org.onap.cps.ncmp.rest.model.RestOutputCmHandlePublicProperties; import org.onap.cps.utils.CpsValidator; import org.onap.cps.utils.JsonObjectMapper; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; @@ -75,12 +77,14 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { private static final String NO_BODY = null; private static final String NO_REQUEST_ID = null; private static final String NO_TOPIC = null; - public static final String ASYNC_REQUEST_ID = "requestId"; - private final NetworkCmProxyDataService networkCmProxyDataService; private final JsonObjectMapper jsonObjectMapper; private final NcmpRestInputMapper ncmpRestInputMapper; private final RestOutputCmHandleStateMapper restOutputCmHandleStateMapper; + private final CpsNcmpTaskExecutor cpsNcmpTaskExecutor; + + @Value("${notification.async.executor.time-out-value-in-ms:2000}") + private int timeOutInMilliSeconds; /** * Get resource data from operational datastore. @@ -96,19 +100,21 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { final @NotNull @Valid String resourceIdentifier, final @Valid String optionsParamInQuery, final @Valid String topicParamInQuery) { - final ResponseEntity<Map<String, Object>> asyncResponse = populateAsyncResponse(topicParamInQuery); - final Map<String, Object> asyncResponseData = asyncResponse.getBody(); + if (isValidTopic(topicParamInQuery)) { + final String requestId = UUID.randomUUID().toString(); + cpsNcmpTaskExecutor.executeTask(() -> + networkCmProxyDataService.getResourceDataOperationalForCmHandle( + cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, + requestId + ), timeOutInMilliSeconds + ); + return acknowledgeAsyncRequest(requestId); + } - final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle(cmHandle, - resourceIdentifier, - optionsParamInQuery, - asyncResponseData == null ? NO_TOPIC : topicParamInQuery, - asyncResponseData == null ? NO_REQUEST_ID : asyncResponseData.get(ASYNC_REQUEST_ID).toString()); + final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle( + cmHandle, resourceIdentifier, optionsParamInQuery, NO_TOPIC, NO_REQUEST_ID); - if (asyncResponseData == null) { - return ResponseEntity.ok(responseObject); - } - return ResponseEntity.ok(asyncResponse); + return ResponseEntity.ok(responseObject); } /** @@ -125,19 +131,21 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { final @NotNull @Valid String resourceIdentifier, final @Valid String optionsParamInQuery, final @Valid String topicParamInQuery) { - final ResponseEntity<Map<String, Object>> asyncResponse = populateAsyncResponse(topicParamInQuery); - final Map<String, Object> asyncResponseData = asyncResponse.getBody(); + if (isValidTopic(topicParamInQuery)) { + final String resourceDataRequestId = UUID.randomUUID().toString(); + cpsNcmpTaskExecutor.executeTask(() -> + networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle( + cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, + resourceDataRequestId + ), timeOutInMilliSeconds + ); + return acknowledgeAsyncRequest(resourceDataRequestId); + } - final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(cmHandle, - resourceIdentifier, - optionsParamInQuery, - asyncResponseData == null ? NO_TOPIC : topicParamInQuery, - asyncResponseData == null ? NO_REQUEST_ID : asyncResponseData.get(ASYNC_REQUEST_ID).toString()); + final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle( + cmHandle, resourceIdentifier, optionsParamInQuery, NO_TOPIC, NO_REQUEST_ID); - if (asyncResponseData == null) { - return ResponseEntity.ok(responseObject); - } - return ResponseEntity.ok(asyncResponse); + return ResponseEntity.ok(responseObject); } @Override @@ -319,18 +327,7 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { return restOutputCmHandle; } - private ResponseEntity<Map<String, Object>> populateAsyncResponse(final String topicParamInQuery) { - final boolean processAsynchronously = hasTopicParameter(topicParamInQuery); - final Map<String, Object> responseData; - if (processAsynchronously) { - responseData = getAsyncResponseData(); - } else { - responseData = null; - } - return ResponseEntity.ok().body(responseData); - } - - private static boolean hasTopicParameter(final String topicName) { + private static boolean isValidTopic(final String topicName) { if (topicName == null) { return false; } @@ -340,11 +337,11 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { throw new InvalidTopicException("Topic name " + topicName + " is invalid", "invalid topic"); } - private Map<String, Object> getAsyncResponseData() { - final Map<String, Object> asyncResponseData = new HashMap<>(1); - final String resourceDataRequestId = UUID.randomUUID().toString(); - asyncResponseData.put(ASYNC_REQUEST_ID, resourceDataRequestId); - return asyncResponseData; + private ResponseEntity<Object> acknowledgeAsyncRequest(final String requestId) { + final Map<String, Object> acknowledgeData = new HashMap<>(1); + acknowledgeData.put("requestId", requestId); + return ResponseEntity.ok(acknowledgeData); } } + diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java new file mode 100644 index 0000000000..3e8929d2e3 --- /dev/null +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.rest.exceptions; + +import lombok.Getter; + +public class CpsTaskExecutionException extends RuntimeException { + + private static final long serialVersionUID = 1481520410918497454L; + + @Getter + final String details; + + /** + * Constructor. + * + * @param message the error message + * @param details the error details + * @param cause the cause of the exception + */ + public CpsTaskExecutionException(final String message, final String details, final Throwable cause) { + super(message, cause); + this.details = details; + } + +}
\ No newline at end of file diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java new file mode 100644 index 0000000000..93aa2858ca --- /dev/null +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.rest.executor; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class CpsNcmpTaskExecutor { + + /** + * Execute task asynchronously and publish response to supplied topic. + * + * @param taskSupplier functional method is get() task need to executed asynchronously + * @param timeOutInMillis the time out value in milliseconds + */ + public void executeTask(final Supplier<Object> taskSupplier, final int timeOutInMillis) { + CompletableFuture.supplyAsync(taskSupplier::get) + .orTimeout(timeOutInMillis, MILLISECONDS) + .whenCompleteAsync( + (responseAsJson, throwable) -> { + handleTaskCompletion(throwable); + } + ); + } + + private void handleTaskCompletion(final Throwable throwable) { + if (throwable == null) { + log.info("Async task completed successfully."); + } else { + log.error("Async task failed. caused by : {}", throwable.getMessage()); + } + } +} + + + diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy index 6cf1506681..3315304258 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy @@ -28,6 +28,7 @@ import org.onap.cps.ncmp.api.inventory.CmHandleState import org.onap.cps.ncmp.api.inventory.CompositeState import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle import org.onap.cps.ncmp.rest.mapper.RestOutputCmHandleStateMapper +import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor import spock.lang.Shared import java.time.OffsetDateTime @@ -83,8 +84,8 @@ class NetworkCmProxyControllerSpec extends Specification { @SpringBean RestOutputCmHandleStateMapper restOutputCmHandleStateMapper = Mappers.getMapper(RestOutputCmHandleStateMapper) - def formattedDateAndTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ") - .format(OffsetDateTime.of(2022, 12, 31, 20, 30, 40, 1, ZoneOffset.UTC)) + @SpringBean + CpsNcmpTaskExecutor spiedCpsTaskExecutor = Spy() @Value('${rest.api.ncmp-base-path}/v1') def ncmpBasePathV1 @@ -95,6 +96,9 @@ class NetworkCmProxyControllerSpec extends Specification { def NO_TOPIC = null def NO_REQUEST_ID = null + def formattedDateAndTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ") + .format(OffsetDateTime.of(2022, 12, 31, 20, 30, 40, 1, ZoneOffset.UTC)) + def 'Get Resource Data from pass-through operational.'() { given: 'resource data url' def getUrl = "$ncmpBasePathV1/ch/testCmHandle/data/ds/ncmp-datastore:passthrough-operational" + @@ -120,34 +124,40 @@ class NetworkCmProxyControllerSpec extends Specification { "?resourceIdentifier=parent/child&options=(a=1,b=2)${topicQueryParam}" when: 'get data resource request is performed' def response = mvc.perform( - get(getUrl) - .contentType(MediaType.APPLICATION_JSON) - ).andReturn().response - then: 'the NCMP data service is called with operational data for cm handle' - expectedNumberOfMethodExecutions - * mockNetworkCmProxyDataService."${expectedMethodName}"('testCmHandle', - 'parent/child', - '(a=1,b=2)', - expectedTopicName, - _) - then: 'response status is expected' - response.status == expectedHttpStatus + get(getUrl).contentType(MediaType.APPLICATION_JSON)).andReturn().response + then: 'task executor is called appropriate number of times' + expectedNumberOfExecutorExecutions * spiedCpsTaskExecutor.executeTask(_, 2000) + and: 'response status is expected' + response.status == HttpStatus.OK.value() + where: 'the following parameters are used' + scenario | datastoreInUrl | topicQueryParam || expectedTopicName | expectedNumberOfExecutorExecutions + 'url with valid topic' | 'passthrough-operational' | '&topic=my-topic-name' || 'my-topic-name' | 1 + 'no topic in url' | 'passthrough-operational' | '' || NO_TOPIC | 0 + 'null topic in url' | 'passthrough-operational' | '&topic=null' || 'null' | 1 + 'url with valid topic' | 'passthrough-running' | '&topic=my-topic-name' || 'my-topic-name' | 1 + 'no topic in url' | 'passthrough-running' | '' || NO_TOPIC | 0 + 'null topic in url' | 'passthrough-running' | '&topic=null' || 'null' | 1 + } + + def 'Fail to get Resource Data from #datastoreInUrl when #scenario.'() { + given: 'resource data url' + def getUrl = "$ncmpBasePathV1/ch/testCmHandle/data/ds/ncmp-datastore:${datastoreInUrl}" + + "?resourceIdentifier=parent/child&options=(a=1,b=2)${topicQueryParam}" + when: 'get data resource request is performed' + def response = mvc.perform( + get(getUrl).contentType(MediaType.APPLICATION_JSON)).andReturn().response + then: 'abad request is returned' + response.status == HttpStatus.BAD_REQUEST.value() where: 'the following parameters are used' - scenario | datastoreInUrl | topicQueryParam || expectedTopicName | expectedMethodName | expectedNumberOfMethodExecutions | expectedHttpStatus - 'url with valid topic' | 'passthrough-operational' | '&topic=my-topic-name' || 'my-topic-name' | 'getResourceDataOperationalForCmHandle' | 1 | HttpStatus.OK.value() - 'no topic in url' | 'passthrough-operational' | '' || NO_TOPIC | 'getResourceDataOperationalForCmHandle' | 1 | HttpStatus.OK.value() - 'null topic in url' | 'passthrough-operational' | '&topic=null' || 'null' | 'getResourceDataOperationalForCmHandle' | 1 | HttpStatus.OK.value() - 'empty topic in url' | 'passthrough-operational' | '&topic=\"\"' || null | 'getResourceDataOperationalForCmHandle' | 0 | HttpStatus.BAD_REQUEST.value() - 'missing topic in url' | 'passthrough-operational' | '&topic=' || null | 'getResourceDataOperationalForCmHandle' | 0 | HttpStatus.BAD_REQUEST.value() - 'blank topic value in url' | 'passthrough-operational' | '&topic=\" \"' || null | 'getResourceDataOperationalForCmHandle' | 0 | HttpStatus.BAD_REQUEST.value() - 'invalid non-empty topic value in url' | 'passthrough-operational' | '&topic=1_5_*_#' || null | 'getResourceDataOperationalForCmHandle' | 0 | HttpStatus.BAD_REQUEST.value() - 'url with valid topic' | 'passthrough-running' | '&topic=my-topic-name' || 'my-topic-name' | 'getResourceDataPassThroughRunningForCmHandle' | 1 | HttpStatus.OK.value() - 'no topic in url' | 'passthrough-running' | '' || NO_TOPIC | 'getResourceDataPassThroughRunningForCmHandle' | 1 | HttpStatus.OK.value() - 'null topic in url' | 'passthrough-running' | '&topic=null' || 'null' | 'getResourceDataPassThroughRunningForCmHandle' | 1 | HttpStatus.OK.value() - 'empty topic in url' | 'passthrough-running' | '&topic=\"\"' || null | 'getResourceDataPassThroughRunningForCmHandle' | 0 | HttpStatus.BAD_REQUEST.value() - 'missing topic in url' | 'passthrough-running' | '&topic=' || null | 'getResourceDataPassThroughRunningForCmHandle' | 0 | HttpStatus.BAD_REQUEST.value() - 'blank topic value in url' | 'passthrough-running' | '&topic=\" \"' || null | 'getResourceDataPassThroughRunningForCmHandle' | 0 | HttpStatus.BAD_REQUEST.value() - 'invalid non-empty topic value in url' | 'passthrough-running' | '&topic=1_5_*_#' || null | 'getResourceDataPassThroughRunningForCmHandle' | 0 | HttpStatus.BAD_REQUEST.value() + scenario | datastoreInUrl | topicQueryParam + 'empty topic in url' | 'passthrough-operational' | '&topic=\"\"' + 'missing topic in url' | 'passthrough-operational' | '&topic=' + 'blank topic value in url' | 'passthrough-operational' | '&topic=\" \"' + 'invalid non-empty topic value in url' | 'passthrough-operational' | '&topic=1_5_*_#' + 'empty topic in url' | 'passthrough-running' | '&topic=\"\"' + 'missing topic in url' | 'passthrough-running' | '&topic=' + 'blank topic value in url' | 'passthrough-running' | '&topic=\" \"' + 'invalid non-empty topic value in url' | 'passthrough-running' | '&topic=1_5_*_#' } def 'Get Resource Data from pass-through running with #scenario value in resource identifier param.'() { diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy index 1258e6e1c4..fd3203b5b1 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy @@ -30,6 +30,7 @@ import org.onap.cps.ncmp.api.impl.exception.HttpClientRequestException import org.onap.cps.ncmp.api.impl.exception.ServerNcmpException import org.onap.cps.ncmp.rest.controller.NcmpRestInputMapper import org.onap.cps.ncmp.rest.mapper.RestOutputCmHandleStateMapper +import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor import org.onap.cps.spi.exceptions.CpsException import org.onap.cps.spi.exceptions.DataNodeNotFoundException import org.onap.cps.spi.exceptions.DataValidationException @@ -62,7 +63,7 @@ class NetworkCmProxyRestExceptionHandlerSpec extends Specification { NetworkCmProxyDataService mockNetworkCmProxyDataService = Mock() @SpringBean - JsonObjectMapper jsonObjectMapper = Stub() + JsonObjectMapper stubbedJsonObjectMapper = Stub() @SpringBean NcmpRestInputMapper ncmpRestInputMapper = Mappers.getMapper(NcmpRestInputMapper) @@ -70,6 +71,9 @@ class NetworkCmProxyRestExceptionHandlerSpec extends Specification { @SpringBean RestOutputCmHandleStateMapper restOutputCmHandleStateMapper = Mappers.getMapper(RestOutputCmHandleStateMapper) + @SpringBean + CpsNcmpTaskExecutor stubbedCpsTaskExecutor = Stub() + @Value('${rest.api.ncmp-base-path}') def basePathNcmp diff --git a/cps-ncmp-rest/src/test/resources/application.yml b/cps-ncmp-rest/src/test/resources/application.yml index f2ca8c759b..0241696c5b 100644 --- a/cps-ncmp-rest/src/test/resources/application.yml +++ b/cps-ncmp-rest/src/test/resources/application.yml @@ -21,3 +21,8 @@ rest: api: ncmp-base-path: /ncmp ncmp-inventory-base-path: /ncmpInventory + +notification: + async: + executor: + time-out-value-in-ms: 2000
\ No newline at end of file diff --git a/cps-ncmp-service/pom.xml b/cps-ncmp-service/pom.xml index 573c76e4a8..45112de173 100644 --- a/cps-ncmp-service/pom.xml +++ b/cps-ncmp-service/pom.xml @@ -38,6 +38,23 @@ <artifactId>cps-service</artifactId> </dependency> <dependency> + <groupId>org.onap.cps</groupId> + <artifactId>cps-ncmp-events</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-web</artifactId> + </dependency> + <dependency> + <groupId>org.mapstruct</groupId> + <artifactId>mapstruct</artifactId> + </dependency> + <dependency> + <groupId>org.mapstruct</groupId> + <artifactId>mapstruct-processor</artifactId> + </dependency> + <!-- T E S T - D E P E N D E N C I E S --> + <dependency> <groupId>org.spockframework</groupId> <artifactId>spock-core</artifactId> <scope>test</scope> @@ -48,6 +65,16 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>spock</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> @@ -58,13 +85,5 @@ </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-web</artifactId> - </dependency> - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-validation</artifactId> - </dependency> </dependencies> </project> diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java new file mode 100644 index 0000000000..4e5c57ba57 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; +import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * Listener for cps-ncmp async request response events. + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class NcmpAsyncRequestResponseEventConsumer { + + private final NcmpAsyncRequestResponseEventProducer ncmpAsyncRequestResponseEventProducer; + private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper; + + /** + * Consume the specified event. + * + * @param dmiAsyncRequestResponseEvent the event to be consumed and produced. + */ + @KafkaListener(topics = "${app.ncmp.async-m2m.topic}") + public void consumeAndForward(final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent) { + log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent); + + final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent = + ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent); + ncmpAsyncRequestResponseEventProducer.sendMessage( + ncmpAsyncRequestResponseEvent.getEventId(), ncmpAsyncRequestResponseEvent); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventMapper.java new file mode 100644 index 0000000000..5d8ac7f841 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventMapper.java @@ -0,0 +1,71 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.UUID; +import org.mapstruct.AfterMapping; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.MappingTarget; +import org.mapstruct.Named; +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; +import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent; + +/** + * Mapper for converting DmiAsyncRequestResponseEvent to NcmpAsyncRequestResponseEvent. + */ +@Mapper(componentModel = "spring") +public interface NcmpAsyncRequestResponseEventMapper { + + @Mapping(source = "eventId", target = "eventId", qualifiedByName = "ncmpAsyncEventId") + @Mapping(source = "eventTime", target = "eventTime", qualifiedByName = "currentTime") + @Mapping(source = "eventId", target = "forwardedEvent.eventId") + @Mapping(source = "eventCorrelationId", target = "forwardedEvent.eventCorrelationId") + @Mapping(source = "eventSchema", target = "forwardedEvent.eventSchema") + @Mapping(source = "eventSource", target = "forwardedEvent.eventSource") + @Mapping(source = "eventTarget", target = "forwardedEvent.eventTarget") + @Mapping(source = "eventTime", target = "forwardedEvent.eventTime") + @Mapping(source = "eventType", target = "forwardedEvent.eventType") + @Mapping(source = "eventContent.responseStatus", target = "forwardedEvent.responseStatus") + @Mapping(source = "eventContent.responseCode", target = "forwardedEvent.responseCode") + @Mapping(source = "eventContent.responseDataSchema", target = "forwardedEvent.responseDataSchema") + NcmpAsyncRequestResponseEvent toNcmpAsyncEvent(DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent); + + @Named("ncmpAsyncEventId") + static String getNcmpAsyncEventId(String eventId) { + return UUID.randomUUID().toString(); + } + + @Named("currentTime") + static String getFormattedCurrentTime(String eventTime) { + return ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")); + } + + @AfterMapping + default void mapAdditionalProperties(DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent, + @MappingTarget NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent) { + ncmpAsyncRequestResponseEvent.getForwardedEvent().setAdditionalProperty("response-data", + dmiAsyncRequestResponseEvent.getEventContent().getResponseData().getAdditionalProperties()); + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventProducer.java new file mode 100644 index 0000000000..8ab6db9045 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventProducer.java @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class NcmpAsyncRequestResponseEventProducer { + + private final KafkaTemplate<String, NcmpAsyncRequestResponseEvent> kafkaTemplate; + + + /** + * Sends message to the configured topic with a message key. + * + * @param eventId message key + * @param ncmpAsyncRequestResponseEvent message payload + */ + public void sendMessage(final String eventId, final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent) { + kafkaTemplate.send(ncmpAsyncRequestResponseEvent.getEventTarget(), eventId, ncmpAsyncRequestResponseEvent); + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy new file mode 100644 index 0000000000..aa6bf1a783 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy @@ -0,0 +1,126 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.mapstruct.factory.Mappers +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent +import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonSerializer +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.spock.Testcontainers +import org.testcontainers.utility.DockerImageName + +import java.time.Duration +import com.fasterxml.jackson.databind.ObjectMapper +import org.onap.cps.utils.JsonObjectMapper +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer +import org.onap.cps.ncmp.utils.TestUtils; +import org.springframework.boot.test.context.SpringBootTest +import org.spockframework.spring.SpringBean +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import spock.lang.Specification + +@SpringBootTest(classes = [NcmpAsyncRequestResponseEventProducer, NcmpAsyncRequestResponseEventConsumer]) +@Testcontainers +@DirtiesContext +class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends Specification { + + static kafkaTestContainer = new KafkaContainer( + DockerImageName.parse('confluentinc/cp-kafka:6.2.1') + ) + + static { + Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) + } + + def setupSpec() { + kafkaTestContainer.start() + } + + def producerConfigProperties = [ + (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) : kafkaTestContainer.getBootstrapServers().split(',')[0], + (ProducerConfig.RETRIES_CONFIG) : 0, + (ProducerConfig.BATCH_SIZE_CONFIG) : 16384, + (ProducerConfig.LINGER_MS_CONFIG) : 1, + (ProducerConfig.BUFFER_MEMORY_CONFIG) : 33554432, + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) : StringSerializer, + (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) : JsonSerializer + ] + + def consumerConfigProperties = [ + (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) : kafkaTestContainer.getBootstrapServers().split(',')[0], + (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) : StringDeserializer, + (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer, + (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) : 'earliest', + (ConsumerConfig.GROUP_ID_CONFIG) : 'test' + ] + + def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties)) + + @SpringBean + NcmpAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducerService = + new NcmpAsyncRequestResponseEventProducer(kafkaTemplate); + + @SpringBean + NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper = + Mappers.getMapper(NcmpAsyncRequestResponseEventMapper.class) + + @SpringBean + NcmpAsyncRequestResponseEventConsumer ncmpAsyncRequestResponseEventConsumer = + new NcmpAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventProducerService, + ncmpAsyncRequestResponseEventMapper) + + def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) + + def kafkaConsumer = new KafkaConsumer<>(getConsumerConfigProperties()) + + def 'Consume and forward valid message'() { + given: 'consumer has a subscription' + kafkaConsumer.subscribe(['test-topic'] as List<String>) + and: 'an event is sent' + def jsonData = TestUtils.getResourceFileContent('dmiAsyncRequestResponseEvent.json') + def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiAsyncRequestResponseEvent.class) + when: 'the event is consumed' + ncmpAsyncRequestResponseEventConsumer.consumeAndForward(testEventSent) + and: 'the topic is polled' + def records = kafkaConsumer.poll(Duration.ofMillis(1500)) + then: 'poll returns one record' + assert records.size() == 1 + and: 'consumed forwarded event id is the same as sent event id' + def record = records.iterator().next() + assert testEventSent.eventId.equalsIgnoreCase(jsonObjectMapper.convertJsonString(record.value(), + NcmpAsyncRequestResponseEvent).getForwardedEvent().getEventId()) + } + + @DynamicPropertySource + static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { + dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) + } + +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy index 4c8dcace7d..964826be13 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy @@ -45,14 +45,14 @@ class DmiServiceUrlBuilderSpec extends Specification { def uriVars = objectUnderTest.populateUriVariables(yangModelCmHandle, "cmHandle", PASSTHROUGH_RUNNING) and: 'query params' - def uriQueries = objectUnderTest.populateQueryParams(resourceId, - 'optionsParamInQuery', topicParamInQuery) + def uriQueries = objectUnderTest.populateQueryParams(resourceId, + 'optionsParamInQuery', topic) when: 'a dmi datastore service url is generated' def dmiServiceUrl = objectUnderTest.getDmiDatastoreUrl(uriQueries, uriVars) then: 'service url is generated as expected' assert dmiServiceUrl == expectedDmiServiceUrl where: 'the following parameters are used' - scenario | topicParamInQuery | resourceId || expectedDmiServiceUrl + scenario | topic | resourceId || expectedDmiServiceUrl 'With valid resourceId' | 'topicParamInQuery' | 'resourceId' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=resourceId&options=optionsParamInQuery&topic=topicParamInQuery' 'With Empty resourceId' | 'topicParamInQuery' | '' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?options=optionsParamInQuery&topic=topicParamInQuery' 'With Empty dmi base path' | 'topicParamInQuery' | 'resourceId' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=resourceId&options=optionsParamInQuery&topic=topicParamInQuery' diff --git a/cps-ncmp-service/src/test/resources/dmiAsyncRequestResponseEvent.json b/cps-ncmp-service/src/test/resources/dmiAsyncRequestResponseEvent.json new file mode 100644 index 0000000000..bf6c86aaac --- /dev/null +++ b/cps-ncmp-service/src/test/resources/dmiAsyncRequestResponseEvent.json @@ -0,0 +1,30 @@ +{ + "eventId": "8dbfe0a7-3b28-4109-8fcb-9fbc9c37d56a", + "eventCorrelationId": "122ca20b-4f8c-4759-a2b4-f0b9456df204", + "eventTime": "2022-05-09T13:34:50.466+0000", + "eventSource": "org.onap.ncmp", + "eventSchema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1", + "eventTarget": "test-topic", + "eventContent": { + "response-data-schema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1", + "response-status": "SUCCESS", + "response-code": "200", + "response-data": { + "ietf-netconf-monitoring:netconf-state": { + "schemas": { + "schema": [ + { + "identifier": "ietf-tls-server", + "version": "2016-11-02", + "format": "ietf-netconf-monitoring:yang", + "namespace": "urn:ietf:params:xml:ns:yang:ietf-tls-server", + "location": [ + "NETCONF" + ] + } + ] + } + } + } + } +} diff --git a/cps-service/pom.xml b/cps-service/pom.xml index b9d6268746..1be45d1bce 100644 --- a/cps-service/pom.xml +++ b/cps-service/pom.xml @@ -1,165 +1,165 @@ -<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ============LICENSE_START=======================================================
- Copyright (C) 2021-2022 Nordix Foundation
- Modifications Copyright (C) 2021 Bell Canada.
- Modifications Copyright (C) 2021 Pantheon.tech
- ================================================================================
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
- SPDX-License-Identifier: Apache-2.0
- ============LICENSE_END=========================================================
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.onap.cps</groupId>
- <artifactId>cps-parent</artifactId>
- <version>3.1.0-SNAPSHOT</version>
- <relativePath>../cps-parent/pom.xml</relativePath>
- </parent>
-
- <artifactId>cps-service</artifactId>
-
- <properties>
- <minimum-coverage>0.94</minimum-coverage>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.onap.cps</groupId>
- <artifactId>cps-events</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-model-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-parser-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-parser-impl</artifactId>
- </dependency>
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-model-util</artifactId>
- </dependency>
- <!-- required for processing yang data in json format -->
- <dependency>
- <groupId>org.opendaylight.yangtools</groupId>
- <artifactId>yang-data-codec-gson</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-cache</artifactId>
- </dependency>
- <dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-messaging</artifactId>
- </dependency>
- <dependency>
- <!-- For logging -->
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <!-- For dependency injection -->
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-validation</artifactId>
- </dependency>
- <dependency>
- <!-- For parsing JSON object -->
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-aop</artifactId>
- </dependency>
- <dependency>
- <groupId>net.logstash.logback</groupId>
- <artifactId>logstash-logback-encoder</artifactId>
- </dependency>
- <dependency>
- <groupId>org.codehaus.janino</groupId>
- <artifactId>janino</artifactId>
- </dependency>
- <!-- T E S T D E P E N D E N C I E S -->
- <dependency>
- <groupId>org.codehaus.groovy</groupId>
- <artifactId>groovy</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.codehaus.groovy</groupId>
- <artifactId>groovy-json</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.spockframework</groupId>
- <artifactId>spock-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.spockframework</groupId>
- <artifactId>spock-spring</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>cglib</groupId>
- <artifactId>cglib-nodep</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>kafka</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjrt</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
+<?xml version="1.0" encoding="UTF-8"?> +<!-- + ============LICENSE_START======================================================= + Copyright (C) 2021-2022 Nordix Foundation + Modifications Copyright (C) 2021 Bell Canada. + Modifications Copyright (C) 2021 Pantheon.tech + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + SPDX-License-Identifier: Apache-2.0 + ============LICENSE_END========================================================= +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.onap.cps</groupId> + <artifactId>cps-parent</artifactId> + <version>3.1.0-SNAPSHOT</version> + <relativePath>../cps-parent/pom.xml</relativePath> + </parent> + + <artifactId>cps-service</artifactId> + + <properties> + <minimum-coverage>0.94</minimum-coverage> + </properties> + + <dependencies> + <dependency> + <groupId>org.onap.cps</groupId> + <artifactId>cps-events</artifactId> + </dependency> + <dependency> + <groupId>org.opendaylight.yangtools</groupId> + <artifactId>yang-model-api</artifactId> + </dependency> + <dependency> + <groupId>org.opendaylight.yangtools</groupId> + <artifactId>yang-parser-api</artifactId> + </dependency> + <dependency> + <groupId>org.opendaylight.yangtools</groupId> + <artifactId>yang-parser-impl</artifactId> + </dependency> + <dependency> + <groupId>org.opendaylight.yangtools</groupId> + <artifactId>yang-model-util</artifactId> + </dependency> + <!-- required for processing yang data in json format --> + <dependency> + <groupId>org.opendaylight.yangtools</groupId> + <artifactId>yang-data-codec-gson</artifactId> + </dependency> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-cache</artifactId> + </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.kafka</groupId> + <artifactId>spring-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-messaging</artifactId> + </dependency> + <dependency> + <!-- For logging --> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <!-- For dependency injection --> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-validation</artifactId> + </dependency> + <dependency> + <!-- For parsing JSON object --> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-aop</artifactId> + </dependency> + <dependency> + <groupId>net.logstash.logback</groupId> + <artifactId>logstash-logback-encoder</artifactId> + </dependency> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + </dependency> + <!-- T E S T D E P E N D E N C I E S --> + <dependency> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy-json</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.spockframework</groupId> + <artifactId>spock-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.spockframework</groupId> + <artifactId>spock-spring</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>cglib</groupId> + <artifactId>cglib-nodep</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework.kafka</groupId> + <artifactId>spring-kafka-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.aspectj</groupId> + <artifactId>aspectjrt</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java index 2f1067aafe..0772a8c9f6 100755 --- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java +++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java @@ -22,6 +22,10 @@ package org.onap.cps.api.impl; +import static org.onap.cps.notification.Operation.CREATE; +import static org.onap.cps.notification.Operation.DELETE; +import static org.onap.cps.notification.Operation.UPDATE; + import java.time.OffsetDateTime; import java.util.Collection; import lombok.AllArgsConstructor; @@ -61,7 +65,7 @@ public class CpsDataServiceImpl implements CpsDataService { CpsValidator.validateNameCharacters(dataspaceName, anchorName); final DataNode dataNode = buildDataNode(dataspaceName, anchorName, ROOT_NODE_XPATH, jsonData); cpsDataPersistenceService.storeDataNode(dataspaceName, anchorName, dataNode); - processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, ROOT_NODE_XPATH, Operation.CREATE); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, ROOT_NODE_XPATH, CREATE); } @Override @@ -70,7 +74,7 @@ public class CpsDataServiceImpl implements CpsDataService { CpsValidator.validateNameCharacters(dataspaceName, anchorName); final DataNode dataNode = buildDataNode(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.addChildDataNode(dataspaceName, anchorName, parentNodeXpath, dataNode); - processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.CREATE); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, CREATE); } @Override @@ -81,7 +85,7 @@ public class CpsDataServiceImpl implements CpsDataService { buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath, listElementDataNodeCollection); - processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.UPDATE); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, UPDATE); } @Override @@ -98,7 +102,7 @@ public class CpsDataServiceImpl implements CpsDataService { final DataNode dataNode = buildDataNode(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService .updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves()); - processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.UPDATE); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, UPDATE); } @Override @@ -113,7 +117,7 @@ public class CpsDataServiceImpl implements CpsDataService { for (final DataNode dataNodeUpdate : dataNodeUpdates) { processDataNodeUpdate(dataspaceName, anchorName, dataNodeUpdate); } - processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.UPDATE); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, UPDATE); } @Override @@ -143,7 +147,7 @@ public class CpsDataServiceImpl implements CpsDataService { CpsValidator.validateNameCharacters(dataspaceName, anchorName); final DataNode dataNode = buildDataNode(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.replaceDataNodeTree(dataspaceName, anchorName, dataNode); - processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.UPDATE); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, UPDATE); } @Override @@ -160,7 +164,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) { CpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes); - processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.UPDATE); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, UPDATE); } @Override @@ -168,7 +172,7 @@ public class CpsDataServiceImpl implements CpsDataService { final OffsetDateTime observedTimestamp) { CpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath); - processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, dataNodeXpath, Operation.DELETE); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, dataNodeXpath, DELETE); } @Override @@ -177,7 +181,7 @@ public class CpsDataServiceImpl implements CpsDataService { CpsValidator.validateNameCharacters(dataspaceName, anchorName); final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName); - processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp); + processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp); } @Override @@ -185,7 +189,7 @@ public class CpsDataServiceImpl implements CpsDataService { final OffsetDateTime observedTimestamp) { CpsValidator.validateNameCharacters(dataspaceName, anchorName); cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath); - processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, listNodeXpath, Operation.DELETE); + processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, listNodeXpath, DELETE); } private DataNode buildDataNode(final String dataspaceName, final String anchorName, @@ -233,10 +237,13 @@ public class CpsDataServiceImpl implements CpsDataService { this.processDataUpdatedEventAsync(anchor, xpath, operation, observedTimestamp); } - private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath, final Operation operation, + private void processDataUpdatedEventAsync(final Anchor anchor, + final String xpath, + final Operation operation, final OffsetDateTime observedTimestamp) { try { - notificationService.processDataUpdatedEvent(anchor, observedTimestamp, xpath, operation); + notificationService.processDataUpdatedEvent(anchor, observedTimestamp, xpath, + operation); } catch (final Exception exception) { //If async message can't be queued for notification service, the initial request should not failed. log.error("Failed to send message to notification service", exception); diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy index 5124a519a7..05b9624a47 100644 --- a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy @@ -33,7 +33,7 @@ class KafkaTestContainerConfig { // Not the best performance but it is good enough for test case private static synchronized KafkaContainer getKafkaContainer() { if (kafkaContainer == null) { - kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.1.1")) + kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false") kafkaContainer.start() Runtime.getRuntime().addShutdownHook(new Thread(kafkaContainer::stop)) diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy index 6ef6874b33..8263c31f07 100644 --- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy @@ -29,7 +29,6 @@ import org.spockframework.spring.SpringSpy import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.boot.test.context.SpringBootTest -import org.springframework.scheduling.annotation.EnableAsync import org.springframework.test.context.ContextConfiguration import spock.lang.Shared import spock.lang.Specification @@ -107,18 +106,18 @@ class NotificationServiceSpec extends Specification { 1 * mockNotificationPublisher.sendNotification(cpsDataUpdatedEvent) where: scenario | xpath | operation || expectedOperationInEvent - 'Same event is sent when root nodes' | '' | Operation.CREATE || Operation.CREATE - 'Same event is sent when root nodes' | '' | Operation.UPDATE || Operation.UPDATE - 'Same event is sent when root nodes' | '' | Operation.DELETE || Operation.DELETE - 'Same event is sent when root nodes' | '/' | Operation.CREATE || Operation.CREATE - 'Same event is sent when root nodes' | '/' | Operation.UPDATE || Operation.UPDATE - 'Same event is sent when root nodes' | '/' | Operation.DELETE || Operation.DELETE - 'Same event is sent when container nodes' | '/parent' | Operation.CREATE || Operation.CREATE - 'Same event is sent when container nodes' | '/parent' | Operation.UPDATE || Operation.UPDATE - 'Same event is sent when container nodes' | '/parent' | Operation.DELETE || Operation.DELETE - 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.CREATE || Operation.UPDATE - 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.UPDATE || Operation.UPDATE - 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.DELETE || Operation.UPDATE + 'Same event is sent when root nodes' | '' | Operation.CREATE || Operation.CREATE + 'Same event is sent when root nodes' | '' | Operation.UPDATE || Operation.UPDATE + 'Same event is sent when root nodes' | '' | Operation.DELETE || Operation.DELETE + 'Same event is sent when root nodes' | '/' | Operation.CREATE || Operation.CREATE + 'Same event is sent when root nodes' | '/' | Operation.UPDATE || Operation.UPDATE + 'Same event is sent when root nodes' | '/' | Operation.DELETE || Operation.DELETE + 'Same event is sent when container nodes' | '/parent' | Operation.CREATE || Operation.CREATE + 'Same event is sent when container nodes' | '/parent' | Operation.UPDATE || Operation.UPDATE + 'Same event is sent when container nodes' | '/parent' | Operation.DELETE || Operation.DELETE + 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.CREATE || Operation.UPDATE + 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.UPDATE || Operation.UPDATE + 'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.DELETE || Operation.UPDATE } def 'Error handling in notification service.'() { diff --git a/cps-service/src/test/resources/application.yml b/cps-service/src/test/resources/application.yml index 436c3d4c34..a28b400834 100644 --- a/cps-service/src/test/resources/application.yml +++ b/cps-service/src/test/resources/application.yml @@ -1,5 +1,6 @@ # ============LICENSE_START======================================================= # Copyright (c) 2021 Bell Canada. +# Modification Copyright (C) 2022 Nordix Foundation. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/csit/plans/cps/setup.sh b/csit/plans/cps/setup.sh index d633b1ee26..59542402f2 100755 --- a/csit/plans/cps/setup.sh +++ b/csit/plans/cps/setup.sh @@ -1,6 +1,7 @@ #!/bin/bash # # Copyright 2016-2017 Huawei Technologies Co., Ltd. +# Modifications Copyright (C) 2022 Nordix Foundation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -64,20 +65,7 @@ cd $WORKSPACE/archives/dc-cps curl -L https://github.com/docker/compose/releases/download/1.25.0/docker-compose-`uname -s`-`uname -m` > docker-compose chmod +x docker-compose -# start CPS and PostgreSQL containers with docker compose -./docker-compose up -d - -###################### setup onap-dmi-plugin ############################ - -cd $WORKSPACE/archives -git clone "https://gerrit.onap.org/r/cps/ncmp-dmi-plugin" -mkdir -p $WORKSPACE/archives/dc-dmi -cat $WORKSPACE/archives/ncmp-dmi-plugin/docker-compose/docker-compose.yml -cp $WORKSPACE/archives/ncmp-dmi-plugin/docker-compose/*.yml $WORKSPACE/archives/dc-dmi -cd $WORKSPACE/archives/dc-dmi -# copy docker-compose (downloaded already for cps) -cp $WORKSPACE/archives/dc-cps/docker-compose . -chmod +x docker-compose +# start CPS/NCMP, DMI, and PostgreSQL containers with docker compose ./docker-compose up -d ###################### setup sdnc ####################################### diff --git a/csit/tests/ncmp-passthrough/ncmp-passthrough.robot b/csit/tests/ncmp-passthrough/ncmp-passthrough.robot index 32d9604e50..95a8d535c7 100644 --- a/csit/tests/ncmp-passthrough/ncmp-passthrough.robot +++ b/csit/tests/ncmp-passthrough/ncmp-passthrough.robot @@ -36,6 +36,13 @@ ${netconf} NETCONF *** Test Cases *** +Get for Passthrough Operational (CF, RO) with fields & topic + ${uri}= Set Variable ${ncmpBasePath}/v1/ch/PNFDemo/data/ds/ncmp-datastore:passthrough-operational?resourceIdentifier=ietf-netconf-monitoring:netconf-state&options=(fields=schemas/schema)&topic=test-topic + ${headers}= Create Dictionary Authorization=${auth} + ${response}= Get On Session CPS_URL ${uri} headers=${headers} expected_status=200 + ${responseJson}= Set Variable ${response.json()} + Should Be Equal As Strings ${response.status_code} 200 + Get for Passthrough Operational (CF, RO) with fields ${uri}= Set Variable ${ncmpBasePath}/v1/ch/PNFDemo/data/ds/ncmp-datastore:passthrough-operational?resourceIdentifier=ietf-netconf-monitoring:netconf-state&options=(fields=schemas/schema) ${headers}= Create Dictionary Authorization=${auth} @@ -136,4 +143,4 @@ Patch will add new category with new book and add a new book to an existing cate ${verifyUri}= Set Variable ${ncmpBasePath}/v1/ch/PNFDemo/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=stores:bookstore/categories=02/books=A%20New%20book%20in%20existing%20category ${verifyResponse}= Get On Session CPS_URL ${verifyUri} headers=${verifyHeaders} ${responseJson}= Set Variable ${verifyResponse.json()} - Should Be Equal As Strings ${verifyResponse.status_code} 200
\ No newline at end of file + Should Be Equal As Strings ${verifyResponse.status_code} 200 diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index 44ebd3bc69..f2f477fe19 100755 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -1,7 +1,7 @@ # ============LICENSE_START======================================================= # Copyright (c) 2020 Pantheon.tech. # Modifications Copyright (C) 2021 Bell Canada. -# Modifications Copyright (C) 2021 Nordix Foundation +# Modifications Copyright (C) 2021-2022 Nordix Foundation. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -64,7 +64,7 @@ services: # - dbpostgresql # zookeeper: - # image: confluentinc/cp-zookeeper:6.1.1 + # image: confluentinc/cp-zookeeper:6.2.1 # environment: # ZOOKEEPER_CLIENT_PORT: 2181 # ZOOKEEPER_TICK_TIME: 2000 @@ -72,7 +72,7 @@ services: # - 22181:2181 # # kafka: - # image: confluentinc/cp-kafka:6.1.1 + # image: confluentinc/cp-kafka:6.2.1 # depends_on: # - zookeeper # ports: @@ -109,9 +109,58 @@ services: DB_PASSWORD: ${DB_PASSWORD:-cps} DMI_USERNAME: ${DMI_USERNAME:-cpsuser} DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!} - #KAFKA_BOOTSTRAP_SERVER: kafka:9092 - #notification.data-updated.enabled: 'true' - #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*' + KAFKA_BOOTSTRAP_SERVER: kafka:9092 + notification.data-updated.enabled: 'true' + NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*' restart: unless-stopped depends_on: - - dbpostgresql
\ No newline at end of file + - dbpostgresql + + ### if kafka is not required comment out zookeeper and kafka ### + zookeeper: + image: confluentinc/cp-zookeeper:6.2.1 + container_name: zookeeper + ports: + - '2181:2181' + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:6.2.1 + container_name: kafka + ports: + - "19092:19092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + ### Comment out this section if dmi plugin is not required ### + ncmp-dmi-plugin: + container_name: ncmp-dmi-plugin + image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/ncmp-dmi-plugin:${DMI_VERSION:-1.2.0-SNAPSHOT-latest} + ports: + - ${DMI_PORT:-8783}:8080 + - ${DMI_MANAGEMENT_PORT:-8787}:8081 + environment: + CPS_USERNAME: ${CPS_CORE_USERNAME:-cpsuser} + CPS_PASSWORD: ${CPS_CORE_PASSWORD:-cpsr0cks!} + CPS_CORE_HOST: ${CPS_CORE_HOST:-cps-and-ncmp} + CPS_CORE_PORT: ${CPS_CORE_PORT:-8080} + CPS_CORE_USERNAME: ${CPS_CORE_USERNAME:-cpsuser} + CPS_CORE_PASSWORD: ${CPS_CORE_PASSWORD:-cpsr0cks!} + SDNC_HOST: ${SDNC_HOST:-sdnc} + SDNC_PORT: ${SDNC_PORT:-8181} + SDNC_USERNAME: ${SDNC_USERNAME:-admin} + SDNC_PASSWORD: ${SDNC_PASSWORD:-Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U} + DMI_SERVICE_URL: ${DMI_SERVICE_URL:-http://ncmp-dmi-plugin:8783} + DMI_USERNAME: ${DMI_USERNAME:-cpsuser} + DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!} + KAFKA_BOOTSTRAP_SERVER: kafka:9092 + notification.data-updated.enabled: 'true' + NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*' + restart: unless-stopped diff --git a/jacoco-report/pom.xml b/jacoco-report/pom.xml index d1181d367c..b8f18e734f 100644 --- a/jacoco-report/pom.xml +++ b/jacoco-report/pom.xml @@ -69,6 +69,7 @@ <exclude>org/onap/cps/ncmp/rest/model/*</exclude> <exclude>org/onap/cps/ncmp/rest/controller/*MapperImpl.class</exclude> <exclude>org/onap/cps/rest/controller/*MapperImpl.class</exclude> + <exclude>org/onap/cps/ncmp/api/impl/async/*MapperImpl.class</exclude> </excludes> </configuration> <executions> @@ -55,6 +55,7 @@ <module>cps-events</module>
<module>cps-service</module>
<module>cps-rest</module>
+ <module>cps-ncmp-events</module>
<module>cps-ncmp-service</module>
<module>cps-ncmp-rest</module>
<module>cps-path-parser</module>
|