diff options
17 files changed, 242 insertions, 98 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index e185bf23f7..39309d842b 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -71,9 +71,16 @@ spring: notification:
data-updated:
enabled: false
- topic: ${CPS_CHANGE_EVENT_TOPIC:cps.cfg-state-events}
+ topic: ${CPS_CHANGE_EVENT_TOPIC:cps.data-updated-events}
filters:
- enabled-dataspaces: ${DATASPACE_FILTER_PATTERNS:""}
+ enabled-dataspaces: ${NOTIFICATION_DATASPACE_FILTER_PATTERNS:""}
+ async-executor:
+ core-pool-size: 2
+ max-pool-size: 10
+ queue-capacity: 500
+ wait-for-tasks-to-complete-on-shutdown: true
+ thread-name-prefix: Async-
+
springdoc:
swagger-ui:
diff --git a/cps-application/src/test/java/org/onap/cps/architecture/LayeredArchitectureTest.java b/cps-application/src/test/java/org/onap/cps/architecture/LayeredArchitectureTest.java index 24454615a3..4228c0b934 100644 --- a/cps-application/src/test/java/org/onap/cps/architecture/LayeredArchitectureTest.java +++ b/cps-application/src/test/java/org/onap/cps/architecture/LayeredArchitectureTest.java @@ -38,7 +38,7 @@ public class LayeredArchitectureTest { private static final String NCMP_REST_PACKAGE = "org.onap.cps.ncmp.rest.."; private static final String API_SERVICE_PACKAGE = "org.onap.cps.api.."; private static final String SPI_SERVICE_PACKAGE = "org.onap.cps.spi.."; - private static final String NCMP_SERVICE_PACKAGE = "org.onap.cps.ncmp.api.."; + private static final String NCMP_SERVICE_PACKAGE = "org.onap.cps.ncmp.api.."; private static final String SPI_REPOSITORY_PACKAGE = "org.onap.cps.spi.repository.."; private static final String YANG_SCHEMA_PACKAGE = "org.onap.cps.yang.."; diff --git a/cps-events/src/main/resources/schemas/cps-data-updated-event-schema.json b/cps-events/src/main/resources/schemas/cps-data-updated-event-schema.json index de445ec722..95dc605da4 100644 --- a/cps-events/src/main/resources/schemas/cps-data-updated-event-schema.json +++ b/cps-events/src/main/resources/schemas/cps-data-updated-event-schema.json @@ -1,7 +1,7 @@ { "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT", + "$id": "urn:cps:org.onap.cps:data-updated-event-schema:v1", "$ref": "#/definitions/CpsDataUpdatedEvent", @@ -12,10 +12,9 @@ "type": "object", "properties": { "schema": { - "description": "The schema, including its version, that this event adheres to.", + "description": "The schema, including its version, that this event adheres to. E.g. 'urn:cps:org.onap.cps:data-updated-event-schema:v99'.", "type": "string", - "default": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT", - "enum": ["urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT"] + "format": "uri" }, "id": { "description": "The unique id identifying the event for the specified source. Producer must ensure that source + id is unique for each distinct event.", @@ -69,7 +68,7 @@ } }, "required": [ - "timestamp", + "observedTimestamp", "dataspaceName", "schemaSetName", "anchorName", @@ -85,4 +84,4 @@ } -}
\ No newline at end of file +} diff --git a/cps-events/src/test/groovy/org/onap/cps/event/CpsDataUpdatedEventSpec.groovy b/cps-events/src/test/groovy/org/onap/cps/event/CpsDataUpdatedEventSpec.groovy index f72eceed1c..545b183378 100644 --- a/cps-events/src/test/groovy/org/onap/cps/event/CpsDataUpdatedEventSpec.groovy +++ b/cps-events/src/test/groovy/org/onap/cps/event/CpsDataUpdatedEventSpec.groovy @@ -33,13 +33,13 @@ class CpsDataUpdatedEventSpec extends Specification { def objectMapper = new ObjectMapper() final DATASPACE_NAME = 'my-dataspace' - final BOOKSTORE_SCHEMA_SET = 'bootstore-schemaset' + final BOOKSTORE_SCHEMA_SET = 'bookstore-schemaset' final ANCHOR_NAME = 'chapters' final EVENT_TIMESTAMP = '2020-12-01T00:00:00.000+0000' final EVENT_ID = '77b8f114-4562-4069-8234-6d059ff742ac' final EVENT_SOURCE = new URI('urn:cps:org.onap.cps') final EVENT_TYPE = 'org.onap.cps.data-updated-event' - final EVENT_SCHEMA = 'urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT' + final EVENT_SCHEMA = new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1') final DATA = [ 'test:bookstore': [ @@ -67,7 +67,7 @@ class CpsDataUpdatedEventSpec extends Specification { then: 'CpsDataUpdatedEvent POJO has the excepted values' cpsDataUpdatedEvent.id == EVENT_ID cpsDataUpdatedEvent.source == EVENT_SOURCE - cpsDataUpdatedEvent.schema.value() == EVENT_SCHEMA + cpsDataUpdatedEvent.schema == EVENT_SCHEMA cpsDataUpdatedEvent.type == EVENT_TYPE def content = cpsDataUpdatedEvent.content content.observedTimestamp == EVENT_TIMESTAMP @@ -90,8 +90,7 @@ class CpsDataUpdatedEventSpec extends Specification { and: 'CpsDataUpdatedEvent with the content' def cpsDataUpdateEvent = new CpsDataUpdatedEvent() cpsDataUpdateEvent - .withSchema( - CpsDataUpdatedEvent.Schema.fromValue(EVENT_SCHEMA)) + .withSchema(EVENT_SCHEMA) .withId(EVENT_ID) .withSource(EVENT_SOURCE) .withType(EVENT_TYPE) @@ -111,4 +110,4 @@ class CpsDataUpdatedEventSpec extends Specification { ) } -}
\ No newline at end of file +} diff --git a/cps-events/src/test/resources/bookstore-chapters.json b/cps-events/src/test/resources/bookstore-chapters.json index de46b71841..753426a60a 100644 --- a/cps-events/src/test/resources/bookstore-chapters.json +++ b/cps-events/src/test/resources/bookstore-chapters.json @@ -1,12 +1,12 @@ { - "schema": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT", + "schema": "urn:cps:org.onap.cps:data-updated-event-schema:v1", "id": "77b8f114-4562-4069-8234-6d059ff742ac", "source": "urn:cps:org.onap.cps", "type": "org.onap.cps.data-updated-event", "content": { "observedTimestamp": "2020-12-01T00:00:00.000+0000", "dataspaceName": "my-dataspace", - "schemaSetName": "bootstore-schemaset", + "schemaSetName": "bookstore-schemaset", "anchorName": "chapters", "data": { "test:bookstore":{ @@ -31,4 +31,4 @@ } } } -}
\ No newline at end of file +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java index ca5fa8e086..235030a84c 100755 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java @@ -25,6 +25,7 @@ package org.onap.cps.ncmp.api.impl; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -71,10 +72,11 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService /** * Constructor Injection for Dependencies. - * @param dmiOperations dmi operation - * @param cpsDataService Data Service Interface + * + * @param dmiOperations dmi operation + * @param cpsDataService Data Service Interface * @param cpsQueryService Query Service Interface - * @param objectMapper Object Mapper + * @param objectMapper Object Mapper */ public NetworkCmProxyDataServiceImpl(final DmiOperations dmiOperations, final CpsDataService cpsDataService, final CpsQueryService cpsQueryService, final ObjectMapper objectMapper) { @@ -96,7 +98,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService @Override public Collection<DataNode> queryDataNodes(final String cmHandle, final String cpsPath, - final FetchDescendantsOption fetchDescendantsOption) { + final FetchDescendantsOption fetchDescendantsOption) { return cpsQueryService.queryDataNodes(getDataspaceName(), cmHandle, cpsPath, fetchDescendantsOption); } @@ -141,7 +143,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService try { final List<PersistenceCmHandle> createdPersistenceCmHandles = new LinkedList<>(); - for (final CmHandle cmHandle: dmiPluginRegistration.getCreatedCmHandles()) { + for (final CmHandle cmHandle : dmiPluginRegistration.getCreatedCmHandles()) { createdPersistenceCmHandles.add(toPersistenceCmHandle(dmiPluginRegistration, cmHandle)); } final PersistenceCmHandlesList persistenceCmHandlesList = new PersistenceCmHandlesList(); @@ -161,7 +163,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService try { final List<PersistenceCmHandle> updatedPersistenceCmHandles = new LinkedList<>(); - for (final CmHandle cmHandle: dmiPluginRegistration.getUpdatedCmHandles()) { + for (final CmHandle cmHandle : dmiPluginRegistration.getUpdatedCmHandles()) { updatedPersistenceCmHandles.add(toPersistenceCmHandle(dmiPluginRegistration, cmHandle)); } final PersistenceCmHandlesList persistenceCmHandlesList = new PersistenceCmHandlesList(); @@ -190,56 +192,56 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService @Override public Object getResourceDataOperationalForCmHandle(final @NotNull String cmHandle, - final @NotNull String resourceIdentifier, - final String acceptParam, - final String fieldsQueryParam, - final Integer depthQueryParam) { + final @NotNull String resourceIdentifier, + final String acceptParam, + final String fieldsQueryParam, + final Integer depthQueryParam) { final var dataNode = fetchDataNodeFromDmiRegistryForCmHandle(cmHandle); final var dmiServiceName = String.valueOf(dataNode.getLeaves().get("dmi-service-name")); final Collection<DataNode> additionalPropsList = dataNode.getChildDataNodes(); final var jsonBody = prepareOperationBody(GenericRequestBody.OperationEnum.READ, additionalPropsList); final ResponseEntity<Object> response = dmiOperations.getResouceDataOperationalFromDmi(dmiServiceName, - cmHandle, - resourceIdentifier, - fieldsQueryParam, - depthQueryParam, - acceptParam, - jsonBody); + cmHandle, + resourceIdentifier, + fieldsQueryParam, + depthQueryParam, + acceptParam, + jsonBody); return handleResponse(response); } @Override public Object getResourceDataPassThroughRunningForCmHandle(final @NotNull String cmHandle, - final @NotNull String resourceIdentifier, - final String accept, - final String fields, - final Integer depth) { + final @NotNull String resourceIdentifier, + final String accept, + final String fields, + final Integer depth) { final var cmHandleDataNode = fetchDataNodeFromDmiRegistryForCmHandle(cmHandle); final var dmiServiceName = String.valueOf(cmHandleDataNode.getLeaves().get("dmi-service-name")); final Collection<DataNode> additionalPropsList = cmHandleDataNode.getChildDataNodes(); final var dmiRequesBody = prepareOperationBody(GenericRequestBody.OperationEnum.READ, additionalPropsList); final ResponseEntity<Object> response = dmiOperations.getResouceDataPassThroughRunningFromDmi(dmiServiceName, - cmHandle, - resourceIdentifier, - fields, - depth, - accept, - dmiRequesBody); + cmHandle, + resourceIdentifier, + fields, + depth, + accept, + dmiRequesBody); return handleResponse(response); } private DataNode fetchDataNodeFromDmiRegistryForCmHandle(final String cmHandle) { final String xpathForDmiRegistryToFetchCmHandle = "/dmi-registry/cm-handles[@id='" + cmHandle + "']"; final var dataNode = cpsDataService.getDataNode(NCMP_DATASPACE_NAME, - NCMP_DMI_REGISTRY_ANCHOR, - xpathForDmiRegistryToFetchCmHandle, - FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); + NCMP_DMI_REGISTRY_ANCHOR, + xpathForDmiRegistryToFetchCmHandle, + FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); return dataNode; } private String prepareOperationBody(final GenericRequestBody.OperationEnum operation, - final Collection<DataNode> additionalPropertyList) { + final Collection<DataNode> additionalPropertyList) { final var requestBody = new GenericRequestBody(); final Map<String, String> additionalPropertyMap = getAdditionalPropertiesMap(additionalPropertyList); requestBody.setOperation(GenericRequestBody.OperationEnum.READ); @@ -250,7 +252,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService } catch (final JsonProcessingException je) { log.error("Parsing error occurred while converting Object to JSON."); throw new NcmpException("Parsing error occurred while converting given object to JSON.", - je.getMessage()); + je.getMessage()); } } @@ -259,9 +261,9 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService return null; } final Map<String, String> additionalPropertyMap = new LinkedHashMap<>(); - for (final var node: additionalPropertyList) { + for (final var node : additionalPropertyList) { additionalPropertyMap.put(String.valueOf(node.getLeaves().get("name")), - String.valueOf(node.getLeaves().get("value"))); + String.valueOf(node.getLeaves().get("value"))); } return additionalPropertyMap; } @@ -271,17 +273,21 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService return responseEntity.getBody(); } else { throw new NcmpException("Not able to get resource data.", - "DMI status code: " + responseEntity.getStatusCodeValue() - + ", DMI response body: " + responseEntity.getBody()); + "DMI status code: " + responseEntity.getStatusCodeValue() + + ", DMI response body: " + responseEntity.getBody()); } } private PersistenceCmHandle toPersistenceCmHandle(final DmiPluginRegistration dmiPluginRegistration, - final CmHandle cmHandle) { + final CmHandle cmHandle) { final PersistenceCmHandle persistenceCmHandle = new PersistenceCmHandle(); persistenceCmHandle.setDmiServiceName(dmiPluginRegistration.getDmiPlugin()); persistenceCmHandle.setId(cmHandle.getCmHandleID()); - persistenceCmHandle.setAdditionalProperties(cmHandle.getCmHandleProperties()); + if (cmHandle.getCmHandleProperties() == null) { + persistenceCmHandle.setAdditionalProperties(Collections.EMPTY_MAP); + } else { + persistenceCmHandle.setAdditionalProperties(cmHandle.getCmHandleProperties()); + } return persistenceCmHandle; } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy index c6c955fba7..0760167181 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy @@ -136,6 +136,21 @@ class NetworkCmProxyDataServiceImplSpec extends Specification { 'create, update and delete' | [persistenceCmHandle ] | [persistenceCmHandle ] | cmHandlesArray || 1 | 1 | 1 } + + def 'Register a DMI Plugin for the given cmHandle without additional properties.'() { + given: 'a registration without cmHandle properties ' + def dmiPluginRegistration = new DmiPluginRegistration() + dmiPluginRegistration.dmiPlugin = 'my-server' + persistenceCmHandle.cmHandleID = '123' + persistenceCmHandle.cmHandleProperties = null + dmiPluginRegistration.createdCmHandles = [persistenceCmHandle ] + def expectedJsonData = '{"cm-handles":[{"id":"123","dmi-service-name":"my-server","additional-properties":[]}]}' + when: 'registration is updated' + objectUnderTest.updateDmiPluginRegistration(dmiPluginRegistration) + then: 'the CPS save list node data is invoked with the expected parameters' + 1 * mockCpsDataService.saveListNodeData('NCMP-Admin', 'ncmp-dmi-registry', '/dmi-registry', expectedJsonData) + } + def 'Get resource data for pass-through operational from dmi.'() { given: 'xpath' def xpath = "/dmi-registry/cm-handles[@id='testCmHandle']" diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java index b45645bc42..8989dc80ef 100755 --- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java +++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java @@ -23,6 +23,7 @@ package org.onap.cps.api.impl; import java.util.Collection; +import lombok.extern.slf4j.Slf4j; import org.onap.cps.api.CpsAdminService; import org.onap.cps.api.CpsDataService; import org.onap.cps.api.CpsModuleService; @@ -39,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service +@Slf4j public class CpsDataServiceImpl implements CpsDataService { private static final String ROOT_NODE_XPATH = "/"; @@ -62,7 +64,7 @@ public class CpsDataServiceImpl implements CpsDataService { public void saveData(final String dataspaceName, final String anchorName, final String jsonData) { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, ROOT_NODE_XPATH, jsonData); cpsDataPersistenceService.storeDataNode(dataspaceName, anchorName, dataNode); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override @@ -70,7 +72,7 @@ public class CpsDataServiceImpl implements CpsDataService { final String jsonData) { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.addChildDataNode(dataspaceName, anchorName, parentNodeXpath, dataNode); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override @@ -79,7 +81,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Collection<DataNode> dataNodesCollection = buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.addListDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodesCollection); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override @@ -94,7 +96,7 @@ public class CpsDataServiceImpl implements CpsDataService { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService .updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves()); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override @@ -114,7 +116,7 @@ public class CpsDataServiceImpl implements CpsDataService { final String jsonData) { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.replaceDataNodeTree(dataspaceName, anchorName, dataNode); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override @@ -123,13 +125,13 @@ public class CpsDataServiceImpl implements CpsDataService { final Collection<DataNode> dataNodes = buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.replaceListDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override public void deleteListNodeData(final String dataspaceName, final String anchorName, final String listNodeXpath) { cpsDataPersistenceService.deleteListDataNodes(dataspaceName, anchorName, listNodeXpath); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @@ -169,6 +171,14 @@ public class CpsDataServiceImpl implements CpsDataService { } + private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName) { + try { + notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + } catch (final Exception exception) { + log.error("Failed to send message to notification service", exception); + } + } + private SchemaContext getSchemaContext(final String dataspaceName, final String schemaSetName) { return yangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName).getSchemaContext(); } diff --git a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java new file mode 100644 index 0000000000..4c961598e4 --- /dev/null +++ b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.config; + +import javax.validation.constraints.Min; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.validation.annotation.Validated; + +@EnableAsync +@Configuration +@ConfigurationProperties("notification.async-executor") +@Validated +@Setter +public class AsyncConfig { + + @Min(0) + private int corePoolSize = 2; + @Min(2) + private int maxPoolSize = 10; + @Min(0) + private int queueCapacity = Integer.MAX_VALUE; + private boolean waitForTasksToCompleteOnShutdown = true; + private String threadNamePrefix = "Async-"; + + /** + * Creates TaskExecutor for processing data-updated events. + * + * @return TaskExecutor + */ + @Bean("notificationExecutor") + public TaskExecutor getThreadAsyncExecutorForNotification() { + final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(corePoolSize); + executor.setMaxPoolSize(maxPoolSize); + executor.setQueueCapacity(queueCapacity); + executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown); + executor.setThreadNamePrefix(threadNamePrefix); + return executor; + } + +} diff --git a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java index e1473d3ff9..e0c8fe7055 100644 --- a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java +++ b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java @@ -6,13 +6,15 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -27,7 +29,6 @@ import org.onap.cps.api.CpsAdminService; import org.onap.cps.api.CpsDataService; import org.onap.cps.event.model.Content; import org.onap.cps.event.model.CpsDataUpdatedEvent; -import org.onap.cps.event.model.CpsDataUpdatedEvent.Schema; import org.onap.cps.event.model.Data; import org.onap.cps.spi.FetchDescendantsOption; import org.onap.cps.spi.model.Anchor; @@ -36,15 +37,17 @@ import org.onap.cps.utils.DataMapUtils; import org.springframework.stereotype.Component; @Component -class CpsDataUpdatedEventFactory { +public class CpsDataUpdatedEventFactory { + private static final URI EVENT_SCHEMA; private static final URI EVENT_SOURCE; private static final String EVENT_TYPE = "org.onap.cps.data-updated-event"; private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - static { + static { try { + EVENT_SCHEMA = new URI("urn:cps:org.onap.cps:data-updated-event-schema:v1"); EVENT_SOURCE = new URI("urn:cps:org.onap.cps"); } catch (final URISyntaxException e) { // As it is fixed string, I don't expect to see this error @@ -52,15 +55,22 @@ class CpsDataUpdatedEventFactory { } } - private CpsDataService cpsDataService; - private CpsAdminService cpsAdminService; + private final CpsDataService cpsDataService; + private final CpsAdminService cpsAdminService; public CpsDataUpdatedEventFactory(final CpsDataService cpsDataService, final CpsAdminService cpsAdminService) { this.cpsDataService = cpsDataService; this.cpsAdminService = cpsAdminService; } - CpsDataUpdatedEvent createCpsDataUpdatedEvent(final String dataspaceName, final String anchorName) { + /** + * Generates CPS Data Updated event. + * + * @param dataspaceName dataspaceName + * @param anchorName anchorName + * @return CpsDataUpdatedEvent + */ + public CpsDataUpdatedEvent createCpsDataUpdatedEvent(final String dataspaceName, final String anchorName) { final var dataNode = cpsDataService .getDataNode(dataspaceName, anchorName, "/", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); final var anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); @@ -71,7 +81,7 @@ class CpsDataUpdatedEventFactory { final var cpsDataUpdatedEvent = new CpsDataUpdatedEvent(); cpsDataUpdatedEvent.withContent(createContent(anchor, dataNode)); cpsDataUpdatedEvent.withId(UUID.randomUUID().toString()); - cpsDataUpdatedEvent.withSchema(Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT); + cpsDataUpdatedEvent.withSchema(EVENT_SCHEMA); cpsDataUpdatedEvent.withSource(EVENT_SOURCE); cpsDataUpdatedEvent.withType(EVENT_TYPE); return cpsDataUpdatedEvent; diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java index 1ab032b57c..2d87488245 100644 --- a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java +++ b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java @@ -1,12 +1,13 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021 Bell Canada. All rights reserved. + * Copyright (c) 2021 Bell Canada. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -54,7 +55,7 @@ public class NotificationPublisher { * * @param cpsDataUpdatedEvent event to be sent to kafka */ - void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) { + public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) { final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + "," + cpsDataUpdatedEvent.getContent().getAnchorName(); log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ", diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java index 9cb2c52e01..4745739a4f 100644 --- a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java +++ b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java @@ -6,13 +6,15 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -21,10 +23,12 @@ package org.onap.cps.notification; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service @@ -45,12 +49,12 @@ public class NotificationService { * @param cpsDataUpdatedEventFactory to create CPSDataUpdatedEvent * @param notificationErrorHandler error handler */ - @Autowired public NotificationService( final NotificationProperties notificationProperties, final NotificationPublisher notificationPublisher, final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory, final NotificationErrorHandler notificationErrorHandler) { + log.info("Notification Properties {}", notificationProperties); this.notificationProperties = notificationProperties; this.notificationPublisher = notificationPublisher; this.cpsDataUpdatedEventFactory = cpsDataUpdatedEventFactory; @@ -75,8 +79,10 @@ public class NotificationService { * * @param dataspaceName dataspace name * @param anchorName anchor name + * @return future */ - public void processDataUpdatedEvent(final String dataspaceName, final String anchorName) { + @Async("notificationExecutor") + public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName) { log.debug("process data updated event for dataspace '{}' & anchor '{}'", dataspaceName, anchorName); try { if (shouldSendNotification(dataspaceName)) { @@ -92,6 +98,7 @@ public class NotificationService { notificationErrorHandler.onException("Failed to process cps-data-updated-event.", exception, dataspaceName, anchorName); } + return CompletableFuture.completedFuture(null); } /* diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdateEventFactorySpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdateEventFactorySpec.groovy index aecc3f7ee0..2ce77bd1a8 100644 --- a/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdateEventFactorySpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdateEventFactorySpec.groovy @@ -21,7 +21,6 @@ package org.onap.cps.notification import org.onap.cps.api.CpsAdminService import org.onap.cps.api.CpsDataService -import org.onap.cps.event.model.CpsDataUpdatedEvent import org.onap.cps.event.model.Data import org.onap.cps.spi.FetchDescendantsOption import org.onap.cps.spi.model.Anchor @@ -61,7 +60,7 @@ class CpsDataUpdateEventFactorySpec extends Specification { with(cpsDataUpdatedEvent) { type == 'org.onap.cps.data-updated-event' source == new URI('urn:cps:org.onap.cps') - schema == CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT + schema == new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1') StringUtils.hasText(id) content != null } diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy index b60d093236..ab727671e1 100644 --- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy @@ -1,12 +1,13 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021 Bell Canada. All rights reserved. + * Copyright (c) 2021 Bell Canada. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,30 +20,37 @@ package org.onap.cps.notification +import org.onap.cps.config.AsyncConfig import org.onap.cps.event.model.CpsDataUpdatedEvent import org.spockframework.spring.SpringBean +import org.spockframework.spring.SpringSpy import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.boot.test.context.SpringBootTest +import org.springframework.scheduling.annotation.EnableAsync import org.springframework.test.context.ContextConfiguration import spock.lang.Shared import spock.lang.Specification +import java.util.concurrent.TimeUnit + @SpringBootTest +@EnableAsync @EnableConfigurationProperties -@ContextConfiguration(classes = [NotificationProperties]) +@ContextConfiguration(classes = [NotificationProperties, NotificationService, NotificationErrorHandler, AsyncConfig]) class NotificationServiceSpec extends Specification { @SpringBean NotificationPublisher mockNotificationPublisher = Mock() @SpringBean - NotificationErrorHandler spyNotificationErrorHandler = Spy(new NotificationErrorHandler()) - @SpringBean CpsDataUpdatedEventFactory mockCpsDataUpdatedEventFactory = Mock() + @SpringSpy + NotificationErrorHandler spyNotificationErrorHandler + @SpringSpy + NotificationProperties spyNotificationProperties @Autowired - NotificationProperties notificationProperties - NotificationProperties spyNotificationProperties + NotificationService objectUnderTest @Shared def myDataspacePublishedName = 'my-dataspace-published' @@ -50,7 +58,7 @@ class NotificationServiceSpec extends Specification { def 'Skip sending notification when disabled.'() { given: 'notification is disabled' - def objectUnderTest = createNotificationService(false) + spyNotificationProperties.isEnabled() >> false when: 'dataUpdatedEvent is received' objectUnderTest.processDataUpdatedEvent(myDataspacePublishedName, myAnchorName) then: 'the notification is not sent' @@ -59,13 +67,17 @@ class NotificationServiceSpec extends Specification { def 'Send notification when enabled: #scenario.'() { given: 'notification is enabled' - def objectUnderTest = createNotificationService(true) + spyNotificationProperties.isEnabled() >> true and: 'event factory can create event successfully' def cpsDataUpdatedEvent = new CpsDataUpdatedEvent() mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(dataspaceName, myAnchorName) >> cpsDataUpdatedEvent when: 'dataUpdatedEvent is received' - objectUnderTest.processDataUpdatedEvent(dataspaceName, myAnchorName) - then: 'notification is sent' + def future = objectUnderTest.processDataUpdatedEvent(dataspaceName, myAnchorName) + and: 'wait for async processing is completed' + future.get(10, TimeUnit.SECONDS) + then: 'async process completed successfully' + future.isDone() + and: 'notification is sent' expectedSendNotificationCount * mockNotificationPublisher.sendNotification(cpsDataUpdatedEvent) where: scenario | dataspaceName || expectedSendNotificationCount @@ -75,13 +87,17 @@ class NotificationServiceSpec extends Specification { def 'Error handling in notification service.'() { given: 'notification is enabled' - def objectUnderTest = createNotificationService(true) + spyNotificationProperties.isEnabled() >> true and: 'event factory can not create event successfully' mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(myDataspacePublishedName, myAnchorName) >> { throw new Exception("Could not create event") } when: 'event is sent for processing' - objectUnderTest.processDataUpdatedEvent(myDataspacePublishedName, myAnchorName) - then: 'error is handled and not thrown to caller' + def future = objectUnderTest.processDataUpdatedEvent(myDataspacePublishedName, myAnchorName) + and: 'wait for async processing is completed' + future.get(10, TimeUnit.SECONDS) + then: 'async process completed successfully' + future.isDone() + and: 'error is handled and not thrown to caller' notThrown Exception 1 * spyNotificationErrorHandler.onException(_, _, _, _) } diff --git a/cps-service/src/test/resources/application.yml b/cps-service/src/test/resources/application.yml index 94f7e818f4..b1546d74f3 100644 --- a/cps-service/src/test/resources/application.yml +++ b/cps-service/src/test/resources/application.yml @@ -1,11 +1,12 @@ # ============LICENSE_START======================================================= -# Copyright (C) 2021 Bell Canada. All rights reserved. +# Copyright (c) 2021 Bell Canada. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,6 +22,10 @@ notification: enabled-dataspaces: ".*-published,.*-important" enabled: true topic: cps-event + async-executor: + core-pool-size: 2 + max-pool-size: 10 + queue-capacity: 0 spring: kafka: diff --git a/docker-compose/README.md b/docker-compose/README.md index 11ccf6150b..ceb05446b4 100644 --- a/docker-compose/README.md +++ b/docker-compose/README.md @@ -2,6 +2,7 @@ ============LICENSE_START======================================================= Copyright (C) 2020 Pantheon.tech Modifications Copyright (C) 2020-2021 Nordix Foundation. + Modifications Copyright (C) 2021 Bell Canada. ================================================================================ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -52,11 +53,12 @@ It starts both Postgres database and CPS services. 1. Edit `docker-compose.yml` 1. uncomment desired service to be deployed, by default `cps-and-ncmp` is enabled. You can comment it and uncomment `cps-standalone` or `ncmp-standalone`. - 2. To send data-updated events to kafka, - * uncomment the `zookeeper` and `kafka` services. - * uncomment environment variables + 2. To send data-updated events to kafka, + * uncomment the `zookeeper` and `kafka` services. + * uncomment environment variables * `notification.data-updated.enabled: 'true'` * `KAFKA_BOOTSTRAP_SERVER: kafka:9092` + * `NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'` 2. Execute following command from `docker-compose` folder: Use one of the below version type that has been generated in the local system's docker image list after the build. diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index e659b0879c..d97cbeb847 100755 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -33,6 +33,7 @@ services: # DB_PASSWORD: ${DB_PASSWORD:-cps} # #KAFKA_BOOTSTRAP_SERVER: kafka:9092 # #notification.data-updated.enabled: 'true' + # #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*' # restart: unless-stopped # depends_on: # - dbpostgresql @@ -53,6 +54,7 @@ services: # DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!} # #KAFKA_BOOTSTRAP_SERVER: kafka:9092 # #notification.data-updated.enabled: 'true' + # #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*' # restart: unless-stopped # depends_on: # - dbpostgresql @@ -73,6 +75,7 @@ services: DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!} #KAFKA_BOOTSTRAP_SERVER: kafka:9092 #notification.data-updated.enabled: 'true' + #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*' restart: unless-stopped depends_on: - dbpostgresql |