summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-application/src/main/resources/application.yml11
-rw-r--r--cps-application/src/test/java/org/onap/cps/architecture/LayeredArchitectureTest.java2
-rw-r--r--cps-events/src/main/resources/schemas/cps-data-updated-event-schema.json11
-rw-r--r--cps-events/src/test/groovy/org/onap/cps/event/CpsDataUpdatedEventSpec.groovy11
-rw-r--r--cps-events/src/test/resources/bookstore-chapters.json6
-rwxr-xr-xcps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java80
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy15
-rwxr-xr-xcps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java24
-rw-r--r--cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java65
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java26
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java7
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationService.java15
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdateEventFactorySpec.groovy3
-rw-r--r--cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy44
-rw-r--r--cps-service/src/test/resources/application.yml9
-rw-r--r--docker-compose/README.md8
-rwxr-xr-xdocker-compose/docker-compose.yml3
17 files changed, 242 insertions, 98 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index e185bf23f..39309d842 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -71,9 +71,16 @@ spring:
notification:
data-updated:
enabled: false
- topic: ${CPS_CHANGE_EVENT_TOPIC:cps.cfg-state-events}
+ topic: ${CPS_CHANGE_EVENT_TOPIC:cps.data-updated-events}
filters:
- enabled-dataspaces: ${DATASPACE_FILTER_PATTERNS:""}
+ enabled-dataspaces: ${NOTIFICATION_DATASPACE_FILTER_PATTERNS:""}
+ async-executor:
+ core-pool-size: 2
+ max-pool-size: 10
+ queue-capacity: 500
+ wait-for-tasks-to-complete-on-shutdown: true
+ thread-name-prefix: Async-
+
springdoc:
swagger-ui:
diff --git a/cps-application/src/test/java/org/onap/cps/architecture/LayeredArchitectureTest.java b/cps-application/src/test/java/org/onap/cps/architecture/LayeredArchitectureTest.java
index 24454615a..4228c0b93 100644
--- a/cps-application/src/test/java/org/onap/cps/architecture/LayeredArchitectureTest.java
+++ b/cps-application/src/test/java/org/onap/cps/architecture/LayeredArchitectureTest.java
@@ -38,7 +38,7 @@ public class LayeredArchitectureTest {
private static final String NCMP_REST_PACKAGE = "org.onap.cps.ncmp.rest..";
private static final String API_SERVICE_PACKAGE = "org.onap.cps.api..";
private static final String SPI_SERVICE_PACKAGE = "org.onap.cps.spi..";
- private static final String NCMP_SERVICE_PACKAGE = "org.onap.cps.ncmp.api..";
+ private static final String NCMP_SERVICE_PACKAGE = "org.onap.cps.ncmp.api..";
private static final String SPI_REPOSITORY_PACKAGE = "org.onap.cps.spi.repository..";
private static final String YANG_SCHEMA_PACKAGE = "org.onap.cps.yang..";
diff --git a/cps-events/src/main/resources/schemas/cps-data-updated-event-schema.json b/cps-events/src/main/resources/schemas/cps-data-updated-event-schema.json
index de445ec72..95dc605da 100644
--- a/cps-events/src/main/resources/schemas/cps-data-updated-event-schema.json
+++ b/cps-events/src/main/resources/schemas/cps-data-updated-event-schema.json
@@ -1,7 +1,7 @@
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
- "$id": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT",
+ "$id": "urn:cps:org.onap.cps:data-updated-event-schema:v1",
"$ref": "#/definitions/CpsDataUpdatedEvent",
@@ -12,10 +12,9 @@
"type": "object",
"properties": {
"schema": {
- "description": "The schema, including its version, that this event adheres to.",
+ "description": "The schema, including its version, that this event adheres to. E.g. 'urn:cps:org.onap.cps:data-updated-event-schema:v99'.",
"type": "string",
- "default": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT",
- "enum": ["urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT"]
+ "format": "uri"
},
"id": {
"description": "The unique id identifying the event for the specified source. Producer must ensure that source + id is unique for each distinct event.",
@@ -69,7 +68,7 @@
}
},
"required": [
- "timestamp",
+ "observedTimestamp",
"dataspaceName",
"schemaSetName",
"anchorName",
@@ -85,4 +84,4 @@
}
-} \ No newline at end of file
+}
diff --git a/cps-events/src/test/groovy/org/onap/cps/event/CpsDataUpdatedEventSpec.groovy b/cps-events/src/test/groovy/org/onap/cps/event/CpsDataUpdatedEventSpec.groovy
index f72eceed1..545b18337 100644
--- a/cps-events/src/test/groovy/org/onap/cps/event/CpsDataUpdatedEventSpec.groovy
+++ b/cps-events/src/test/groovy/org/onap/cps/event/CpsDataUpdatedEventSpec.groovy
@@ -33,13 +33,13 @@ class CpsDataUpdatedEventSpec extends Specification {
def objectMapper = new ObjectMapper()
final DATASPACE_NAME = 'my-dataspace'
- final BOOKSTORE_SCHEMA_SET = 'bootstore-schemaset'
+ final BOOKSTORE_SCHEMA_SET = 'bookstore-schemaset'
final ANCHOR_NAME = 'chapters'
final EVENT_TIMESTAMP = '2020-12-01T00:00:00.000+0000'
final EVENT_ID = '77b8f114-4562-4069-8234-6d059ff742ac'
final EVENT_SOURCE = new URI('urn:cps:org.onap.cps')
final EVENT_TYPE = 'org.onap.cps.data-updated-event'
- final EVENT_SCHEMA = 'urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT'
+ final EVENT_SCHEMA = new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1')
final DATA = [
'test:bookstore': [
@@ -67,7 +67,7 @@ class CpsDataUpdatedEventSpec extends Specification {
then: 'CpsDataUpdatedEvent POJO has the excepted values'
cpsDataUpdatedEvent.id == EVENT_ID
cpsDataUpdatedEvent.source == EVENT_SOURCE
- cpsDataUpdatedEvent.schema.value() == EVENT_SCHEMA
+ cpsDataUpdatedEvent.schema == EVENT_SCHEMA
cpsDataUpdatedEvent.type == EVENT_TYPE
def content = cpsDataUpdatedEvent.content
content.observedTimestamp == EVENT_TIMESTAMP
@@ -90,8 +90,7 @@ class CpsDataUpdatedEventSpec extends Specification {
and: 'CpsDataUpdatedEvent with the content'
def cpsDataUpdateEvent = new CpsDataUpdatedEvent()
cpsDataUpdateEvent
- .withSchema(
- CpsDataUpdatedEvent.Schema.fromValue(EVENT_SCHEMA))
+ .withSchema(EVENT_SCHEMA)
.withId(EVENT_ID)
.withSource(EVENT_SOURCE)
.withType(EVENT_TYPE)
@@ -111,4 +110,4 @@ class CpsDataUpdatedEventSpec extends Specification {
)
}
-} \ No newline at end of file
+}
diff --git a/cps-events/src/test/resources/bookstore-chapters.json b/cps-events/src/test/resources/bookstore-chapters.json
index de46b7184..753426a60 100644
--- a/cps-events/src/test/resources/bookstore-chapters.json
+++ b/cps-events/src/test/resources/bookstore-chapters.json
@@ -1,12 +1,12 @@
{
- "schema": "urn:cps:org.onap.cps:data-updated-event-schema:1.1.0-SNAPSHOT",
+ "schema": "urn:cps:org.onap.cps:data-updated-event-schema:v1",
"id": "77b8f114-4562-4069-8234-6d059ff742ac",
"source": "urn:cps:org.onap.cps",
"type": "org.onap.cps.data-updated-event",
"content": {
"observedTimestamp": "2020-12-01T00:00:00.000+0000",
"dataspaceName": "my-dataspace",
- "schemaSetName": "bootstore-schemaset",
+ "schemaSetName": "bookstore-schemaset",
"anchorName": "chapters",
"data": {
"test:bookstore":{
@@ -31,4 +31,4 @@
}
}
}
-} \ No newline at end of file
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
index ca5fa8e08..235030a84 100755
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java
@@ -25,6 +25,7 @@ package org.onap.cps.ncmp.api.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -71,10 +72,11 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
/**
* Constructor Injection for Dependencies.
- * @param dmiOperations dmi operation
- * @param cpsDataService Data Service Interface
+ *
+ * @param dmiOperations dmi operation
+ * @param cpsDataService Data Service Interface
* @param cpsQueryService Query Service Interface
- * @param objectMapper Object Mapper
+ * @param objectMapper Object Mapper
*/
public NetworkCmProxyDataServiceImpl(final DmiOperations dmiOperations, final CpsDataService cpsDataService,
final CpsQueryService cpsQueryService, final ObjectMapper objectMapper) {
@@ -96,7 +98,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
@Override
public Collection<DataNode> queryDataNodes(final String cmHandle, final String cpsPath,
- final FetchDescendantsOption fetchDescendantsOption) {
+ final FetchDescendantsOption fetchDescendantsOption) {
return cpsQueryService.queryDataNodes(getDataspaceName(), cmHandle, cpsPath, fetchDescendantsOption);
}
@@ -141,7 +143,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
try {
final List<PersistenceCmHandle> createdPersistenceCmHandles =
new LinkedList<>();
- for (final CmHandle cmHandle: dmiPluginRegistration.getCreatedCmHandles()) {
+ for (final CmHandle cmHandle : dmiPluginRegistration.getCreatedCmHandles()) {
createdPersistenceCmHandles.add(toPersistenceCmHandle(dmiPluginRegistration, cmHandle));
}
final PersistenceCmHandlesList persistenceCmHandlesList = new PersistenceCmHandlesList();
@@ -161,7 +163,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
try {
final List<PersistenceCmHandle> updatedPersistenceCmHandles =
new LinkedList<>();
- for (final CmHandle cmHandle: dmiPluginRegistration.getUpdatedCmHandles()) {
+ for (final CmHandle cmHandle : dmiPluginRegistration.getUpdatedCmHandles()) {
updatedPersistenceCmHandles.add(toPersistenceCmHandle(dmiPluginRegistration, cmHandle));
}
final PersistenceCmHandlesList persistenceCmHandlesList = new PersistenceCmHandlesList();
@@ -190,56 +192,56 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
@Override
public Object getResourceDataOperationalForCmHandle(final @NotNull String cmHandle,
- final @NotNull String resourceIdentifier,
- final String acceptParam,
- final String fieldsQueryParam,
- final Integer depthQueryParam) {
+ final @NotNull String resourceIdentifier,
+ final String acceptParam,
+ final String fieldsQueryParam,
+ final Integer depthQueryParam) {
final var dataNode = fetchDataNodeFromDmiRegistryForCmHandle(cmHandle);
final var dmiServiceName = String.valueOf(dataNode.getLeaves().get("dmi-service-name"));
final Collection<DataNode> additionalPropsList = dataNode.getChildDataNodes();
final var jsonBody = prepareOperationBody(GenericRequestBody.OperationEnum.READ, additionalPropsList);
final ResponseEntity<Object> response = dmiOperations.getResouceDataOperationalFromDmi(dmiServiceName,
- cmHandle,
- resourceIdentifier,
- fieldsQueryParam,
- depthQueryParam,
- acceptParam,
- jsonBody);
+ cmHandle,
+ resourceIdentifier,
+ fieldsQueryParam,
+ depthQueryParam,
+ acceptParam,
+ jsonBody);
return handleResponse(response);
}
@Override
public Object getResourceDataPassThroughRunningForCmHandle(final @NotNull String cmHandle,
- final @NotNull String resourceIdentifier,
- final String accept,
- final String fields,
- final Integer depth) {
+ final @NotNull String resourceIdentifier,
+ final String accept,
+ final String fields,
+ final Integer depth) {
final var cmHandleDataNode = fetchDataNodeFromDmiRegistryForCmHandle(cmHandle);
final var dmiServiceName = String.valueOf(cmHandleDataNode.getLeaves().get("dmi-service-name"));
final Collection<DataNode> additionalPropsList = cmHandleDataNode.getChildDataNodes();
final var dmiRequesBody = prepareOperationBody(GenericRequestBody.OperationEnum.READ, additionalPropsList);
final ResponseEntity<Object> response = dmiOperations.getResouceDataPassThroughRunningFromDmi(dmiServiceName,
- cmHandle,
- resourceIdentifier,
- fields,
- depth,
- accept,
- dmiRequesBody);
+ cmHandle,
+ resourceIdentifier,
+ fields,
+ depth,
+ accept,
+ dmiRequesBody);
return handleResponse(response);
}
private DataNode fetchDataNodeFromDmiRegistryForCmHandle(final String cmHandle) {
final String xpathForDmiRegistryToFetchCmHandle = "/dmi-registry/cm-handles[@id='" + cmHandle + "']";
final var dataNode = cpsDataService.getDataNode(NCMP_DATASPACE_NAME,
- NCMP_DMI_REGISTRY_ANCHOR,
- xpathForDmiRegistryToFetchCmHandle,
- FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
+ NCMP_DMI_REGISTRY_ANCHOR,
+ xpathForDmiRegistryToFetchCmHandle,
+ FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
return dataNode;
}
private String prepareOperationBody(final GenericRequestBody.OperationEnum operation,
- final Collection<DataNode> additionalPropertyList) {
+ final Collection<DataNode> additionalPropertyList) {
final var requestBody = new GenericRequestBody();
final Map<String, String> additionalPropertyMap = getAdditionalPropertiesMap(additionalPropertyList);
requestBody.setOperation(GenericRequestBody.OperationEnum.READ);
@@ -250,7 +252,7 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
} catch (final JsonProcessingException je) {
log.error("Parsing error occurred while converting Object to JSON.");
throw new NcmpException("Parsing error occurred while converting given object to JSON.",
- je.getMessage());
+ je.getMessage());
}
}
@@ -259,9 +261,9 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
return null;
}
final Map<String, String> additionalPropertyMap = new LinkedHashMap<>();
- for (final var node: additionalPropertyList) {
+ for (final var node : additionalPropertyList) {
additionalPropertyMap.put(String.valueOf(node.getLeaves().get("name")),
- String.valueOf(node.getLeaves().get("value")));
+ String.valueOf(node.getLeaves().get("value")));
}
return additionalPropertyMap;
}
@@ -271,17 +273,21 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService
return responseEntity.getBody();
} else {
throw new NcmpException("Not able to get resource data.",
- "DMI status code: " + responseEntity.getStatusCodeValue()
- + ", DMI response body: " + responseEntity.getBody());
+ "DMI status code: " + responseEntity.getStatusCodeValue()
+ + ", DMI response body: " + responseEntity.getBody());
}
}
private PersistenceCmHandle toPersistenceCmHandle(final DmiPluginRegistration dmiPluginRegistration,
- final CmHandle cmHandle) {
+ final CmHandle cmHandle) {
final PersistenceCmHandle persistenceCmHandle = new PersistenceCmHandle();
persistenceCmHandle.setDmiServiceName(dmiPluginRegistration.getDmiPlugin());
persistenceCmHandle.setId(cmHandle.getCmHandleID());
- persistenceCmHandle.setAdditionalProperties(cmHandle.getCmHandleProperties());
+ if (cmHandle.getCmHandleProperties() == null) {
+ persistenceCmHandle.setAdditionalProperties(Collections.EMPTY_MAP);
+ } else {
+ persistenceCmHandle.setAdditionalProperties(cmHandle.getCmHandleProperties());
+ }
return persistenceCmHandle;
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
index c6c955fba..076016718 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
@@ -136,6 +136,21 @@ class NetworkCmProxyDataServiceImplSpec extends Specification {
'create, update and delete' | [persistenceCmHandle ] | [persistenceCmHandle ] | cmHandlesArray || 1 | 1 | 1
}
+
+ def 'Register a DMI Plugin for the given cmHandle without additional properties.'() {
+ given: 'a registration without cmHandle properties '
+ def dmiPluginRegistration = new DmiPluginRegistration()
+ dmiPluginRegistration.dmiPlugin = 'my-server'
+ persistenceCmHandle.cmHandleID = '123'
+ persistenceCmHandle.cmHandleProperties = null
+ dmiPluginRegistration.createdCmHandles = [persistenceCmHandle ]
+ def expectedJsonData = '{"cm-handles":[{"id":"123","dmi-service-name":"my-server","additional-properties":[]}]}'
+ when: 'registration is updated'
+ objectUnderTest.updateDmiPluginRegistration(dmiPluginRegistration)
+ then: 'the CPS save list node data is invoked with the expected parameters'
+ 1 * mockCpsDataService.saveListNodeData('NCMP-Admin', 'ncmp-dmi-registry', '/dmi-registry', expectedJsonData)
+ }
+
def 'Get resource data for pass-through operational from dmi.'() {
given: 'xpath'
def xpath = "/dmi-registry/cm-handles[@id='testCmHandle']"
diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
index b45645bc4..8989dc80e 100755
--- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
+++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
@@ -23,6 +23,7 @@
package org.onap.cps.api.impl;
import java.util.Collection;
+import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.CpsAdminService;
import org.onap.cps.api.CpsDataService;
import org.onap.cps.api.CpsModuleService;
@@ -39,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
+@Slf4j
public class CpsDataServiceImpl implements CpsDataService {
private static final String ROOT_NODE_XPATH = "/";
@@ -62,7 +64,7 @@ public class CpsDataServiceImpl implements CpsDataService {
public void saveData(final String dataspaceName, final String anchorName, final String jsonData) {
final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, ROOT_NODE_XPATH, jsonData);
cpsDataPersistenceService.storeDataNode(dataspaceName, anchorName, dataNode);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
@@ -70,7 +72,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final String jsonData) {
final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData);
cpsDataPersistenceService.addChildDataNode(dataspaceName, anchorName, parentNodeXpath, dataNode);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
@@ -79,7 +81,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final Collection<DataNode> dataNodesCollection =
buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData);
cpsDataPersistenceService.addListDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodesCollection);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
@@ -94,7 +96,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData);
cpsDataPersistenceService
.updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves());
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
@@ -114,7 +116,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final String jsonData) {
final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData);
cpsDataPersistenceService.replaceDataNodeTree(dataspaceName, anchorName, dataNode);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
@@ -123,13 +125,13 @@ public class CpsDataServiceImpl implements CpsDataService {
final Collection<DataNode> dataNodes =
buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData);
cpsDataPersistenceService.replaceListDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
public void deleteListNodeData(final String dataspaceName, final String anchorName, final String listNodeXpath) {
cpsDataPersistenceService.deleteListDataNodes(dataspaceName, anchorName, listNodeXpath);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@@ -169,6 +171,14 @@ public class CpsDataServiceImpl implements CpsDataService {
}
+ private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName) {
+ try {
+ notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ } catch (final Exception exception) {
+ log.error("Failed to send message to notification service", exception);
+ }
+ }
+
private SchemaContext getSchemaContext(final String dataspaceName, final String schemaSetName) {
return yangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName).getSchemaContext();
}
diff --git a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
new file mode 100644
index 000000000..4c961598e
--- /dev/null
+++ b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.config;
+
+import javax.validation.constraints.Min;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.validation.annotation.Validated;
+
+@EnableAsync
+@Configuration
+@ConfigurationProperties("notification.async-executor")
+@Validated
+@Setter
+public class AsyncConfig {
+
+ @Min(0)
+ private int corePoolSize = 2;
+ @Min(2)
+ private int maxPoolSize = 10;
+ @Min(0)
+ private int queueCapacity = Integer.MAX_VALUE;
+ private boolean waitForTasksToCompleteOnShutdown = true;
+ private String threadNamePrefix = "Async-";
+
+ /**
+ * Creates TaskExecutor for processing data-updated events.
+ *
+ * @return TaskExecutor
+ */
+ @Bean("notificationExecutor")
+ public TaskExecutor getThreadAsyncExecutorForNotification() {
+ final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(corePoolSize);
+ executor.setMaxPoolSize(maxPoolSize);
+ executor.setQueueCapacity(queueCapacity);
+ executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
+ executor.setThreadNamePrefix(threadNamePrefix);
+ return executor;
+ }
+
+}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java
index e1473d3ff..e0c8fe705 100644
--- a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java
+++ b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java
@@ -6,13 +6,15 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -27,7 +29,6 @@ import org.onap.cps.api.CpsAdminService;
import org.onap.cps.api.CpsDataService;
import org.onap.cps.event.model.Content;
import org.onap.cps.event.model.CpsDataUpdatedEvent;
-import org.onap.cps.event.model.CpsDataUpdatedEvent.Schema;
import org.onap.cps.event.model.Data;
import org.onap.cps.spi.FetchDescendantsOption;
import org.onap.cps.spi.model.Anchor;
@@ -36,15 +37,17 @@ import org.onap.cps.utils.DataMapUtils;
import org.springframework.stereotype.Component;
@Component
-class CpsDataUpdatedEventFactory {
+public class CpsDataUpdatedEventFactory {
+ private static final URI EVENT_SCHEMA;
private static final URI EVENT_SOURCE;
private static final String EVENT_TYPE = "org.onap.cps.data-updated-event";
private static final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
- static {
+ static {
try {
+ EVENT_SCHEMA = new URI("urn:cps:org.onap.cps:data-updated-event-schema:v1");
EVENT_SOURCE = new URI("urn:cps:org.onap.cps");
} catch (final URISyntaxException e) {
// As it is fixed string, I don't expect to see this error
@@ -52,15 +55,22 @@ class CpsDataUpdatedEventFactory {
}
}
- private CpsDataService cpsDataService;
- private CpsAdminService cpsAdminService;
+ private final CpsDataService cpsDataService;
+ private final CpsAdminService cpsAdminService;
public CpsDataUpdatedEventFactory(final CpsDataService cpsDataService, final CpsAdminService cpsAdminService) {
this.cpsDataService = cpsDataService;
this.cpsAdminService = cpsAdminService;
}
- CpsDataUpdatedEvent createCpsDataUpdatedEvent(final String dataspaceName, final String anchorName) {
+ /**
+ * Generates CPS Data Updated event.
+ *
+ * @param dataspaceName dataspaceName
+ * @param anchorName anchorName
+ * @return CpsDataUpdatedEvent
+ */
+ public CpsDataUpdatedEvent createCpsDataUpdatedEvent(final String dataspaceName, final String anchorName) {
final var dataNode = cpsDataService
.getDataNode(dataspaceName, anchorName, "/", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
final var anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
@@ -71,7 +81,7 @@ class CpsDataUpdatedEventFactory {
final var cpsDataUpdatedEvent = new CpsDataUpdatedEvent();
cpsDataUpdatedEvent.withContent(createContent(anchor, dataNode));
cpsDataUpdatedEvent.withId(UUID.randomUUID().toString());
- cpsDataUpdatedEvent.withSchema(Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT);
+ cpsDataUpdatedEvent.withSchema(EVENT_SCHEMA);
cpsDataUpdatedEvent.withSource(EVENT_SOURCE);
cpsDataUpdatedEvent.withType(EVENT_TYPE);
return cpsDataUpdatedEvent;
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java
index 1ab032b57..2d8748824 100644
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java
+++ b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java
@@ -1,12 +1,13 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
+ * Copyright (c) 2021 Bell Canada.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,7 +55,7 @@ public class NotificationPublisher {
*
* @param cpsDataUpdatedEvent event to be sent to kafka
*/
- void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
+ public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
+ cpsDataUpdatedEvent.getContent().getAnchorName();
log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java
index 9cb2c52e0..4745739a4 100644
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java
+++ b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java
@@ -6,13 +6,15 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -21,10 +23,12 @@ package org.onap.cps.notification;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
@@ -45,12 +49,12 @@ public class NotificationService {
* @param cpsDataUpdatedEventFactory to create CPSDataUpdatedEvent
* @param notificationErrorHandler error handler
*/
- @Autowired
public NotificationService(
final NotificationProperties notificationProperties,
final NotificationPublisher notificationPublisher,
final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory,
final NotificationErrorHandler notificationErrorHandler) {
+ log.info("Notification Properties {}", notificationProperties);
this.notificationProperties = notificationProperties;
this.notificationPublisher = notificationPublisher;
this.cpsDataUpdatedEventFactory = cpsDataUpdatedEventFactory;
@@ -75,8 +79,10 @@ public class NotificationService {
*
* @param dataspaceName dataspace name
* @param anchorName anchor name
+ * @return future
*/
- public void processDataUpdatedEvent(final String dataspaceName, final String anchorName) {
+ @Async("notificationExecutor")
+ public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName) {
log.debug("process data updated event for dataspace '{}' & anchor '{}'", dataspaceName, anchorName);
try {
if (shouldSendNotification(dataspaceName)) {
@@ -92,6 +98,7 @@ public class NotificationService {
notificationErrorHandler.onException("Failed to process cps-data-updated-event.",
exception, dataspaceName, anchorName);
}
+ return CompletableFuture.completedFuture(null);
}
/*
diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdateEventFactorySpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdateEventFactorySpec.groovy
index aecc3f7ee..2ce77bd1a 100644
--- a/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdateEventFactorySpec.groovy
+++ b/cps-service/src/test/groovy/org/onap/cps/notification/CpsDataUpdateEventFactorySpec.groovy
@@ -21,7 +21,6 @@ package org.onap.cps.notification
import org.onap.cps.api.CpsAdminService
import org.onap.cps.api.CpsDataService
-import org.onap.cps.event.model.CpsDataUpdatedEvent
import org.onap.cps.event.model.Data
import org.onap.cps.spi.FetchDescendantsOption
import org.onap.cps.spi.model.Anchor
@@ -61,7 +60,7 @@ class CpsDataUpdateEventFactorySpec extends Specification {
with(cpsDataUpdatedEvent) {
type == 'org.onap.cps.data-updated-event'
source == new URI('urn:cps:org.onap.cps')
- schema == CpsDataUpdatedEvent.Schema.URN_CPS_ORG_ONAP_CPS_DATA_UPDATED_EVENT_SCHEMA_1_1_0_SNAPSHOT
+ schema == new URI('urn:cps:org.onap.cps:data-updated-event-schema:v1')
StringUtils.hasText(id)
content != null
}
diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy
index b60d09323..ab727671e 100644
--- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy
+++ b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy
@@ -1,12 +1,13 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
+ * Copyright (c) 2021 Bell Canada.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,30 +20,37 @@
package org.onap.cps.notification
+import org.onap.cps.config.AsyncConfig
import org.onap.cps.event.model.CpsDataUpdatedEvent
import org.spockframework.spring.SpringBean
+import org.spockframework.spring.SpringSpy
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.scheduling.annotation.EnableAsync
import org.springframework.test.context.ContextConfiguration
import spock.lang.Shared
import spock.lang.Specification
+import java.util.concurrent.TimeUnit
+
@SpringBootTest
+@EnableAsync
@EnableConfigurationProperties
-@ContextConfiguration(classes = [NotificationProperties])
+@ContextConfiguration(classes = [NotificationProperties, NotificationService, NotificationErrorHandler, AsyncConfig])
class NotificationServiceSpec extends Specification {
@SpringBean
NotificationPublisher mockNotificationPublisher = Mock()
@SpringBean
- NotificationErrorHandler spyNotificationErrorHandler = Spy(new NotificationErrorHandler())
- @SpringBean
CpsDataUpdatedEventFactory mockCpsDataUpdatedEventFactory = Mock()
+ @SpringSpy
+ NotificationErrorHandler spyNotificationErrorHandler
+ @SpringSpy
+ NotificationProperties spyNotificationProperties
@Autowired
- NotificationProperties notificationProperties
- NotificationProperties spyNotificationProperties
+ NotificationService objectUnderTest
@Shared
def myDataspacePublishedName = 'my-dataspace-published'
@@ -50,7 +58,7 @@ class NotificationServiceSpec extends Specification {
def 'Skip sending notification when disabled.'() {
given: 'notification is disabled'
- def objectUnderTest = createNotificationService(false)
+ spyNotificationProperties.isEnabled() >> false
when: 'dataUpdatedEvent is received'
objectUnderTest.processDataUpdatedEvent(myDataspacePublishedName, myAnchorName)
then: 'the notification is not sent'
@@ -59,13 +67,17 @@ class NotificationServiceSpec extends Specification {
def 'Send notification when enabled: #scenario.'() {
given: 'notification is enabled'
- def objectUnderTest = createNotificationService(true)
+ spyNotificationProperties.isEnabled() >> true
and: 'event factory can create event successfully'
def cpsDataUpdatedEvent = new CpsDataUpdatedEvent()
mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(dataspaceName, myAnchorName) >> cpsDataUpdatedEvent
when: 'dataUpdatedEvent is received'
- objectUnderTest.processDataUpdatedEvent(dataspaceName, myAnchorName)
- then: 'notification is sent'
+ def future = objectUnderTest.processDataUpdatedEvent(dataspaceName, myAnchorName)
+ and: 'wait for async processing is completed'
+ future.get(10, TimeUnit.SECONDS)
+ then: 'async process completed successfully'
+ future.isDone()
+ and: 'notification is sent'
expectedSendNotificationCount * mockNotificationPublisher.sendNotification(cpsDataUpdatedEvent)
where:
scenario | dataspaceName || expectedSendNotificationCount
@@ -75,13 +87,17 @@ class NotificationServiceSpec extends Specification {
def 'Error handling in notification service.'() {
given: 'notification is enabled'
- def objectUnderTest = createNotificationService(true)
+ spyNotificationProperties.isEnabled() >> true
and: 'event factory can not create event successfully'
mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(myDataspacePublishedName, myAnchorName) >>
{ throw new Exception("Could not create event") }
when: 'event is sent for processing'
- objectUnderTest.processDataUpdatedEvent(myDataspacePublishedName, myAnchorName)
- then: 'error is handled and not thrown to caller'
+ def future = objectUnderTest.processDataUpdatedEvent(myDataspacePublishedName, myAnchorName)
+ and: 'wait for async processing is completed'
+ future.get(10, TimeUnit.SECONDS)
+ then: 'async process completed successfully'
+ future.isDone()
+ and: 'error is handled and not thrown to caller'
notThrown Exception
1 * spyNotificationErrorHandler.onException(_, _, _, _)
}
diff --git a/cps-service/src/test/resources/application.yml b/cps-service/src/test/resources/application.yml
index 94f7e818f..b1546d74f 100644
--- a/cps-service/src/test/resources/application.yml
+++ b/cps-service/src/test/resources/application.yml
@@ -1,11 +1,12 @@
# ============LICENSE_START=======================================================
-# Copyright (C) 2021 Bell Canada. All rights reserved.
+# Copyright (c) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +22,10 @@ notification:
enabled-dataspaces: ".*-published,.*-important"
enabled: true
topic: cps-event
+ async-executor:
+ core-pool-size: 2
+ max-pool-size: 10
+ queue-capacity: 0
spring:
kafka:
diff --git a/docker-compose/README.md b/docker-compose/README.md
index 11ccf6150..ceb05446b 100644
--- a/docker-compose/README.md
+++ b/docker-compose/README.md
@@ -2,6 +2,7 @@
============LICENSE_START=======================================================
Copyright (C) 2020 Pantheon.tech
Modifications Copyright (C) 2020-2021 Nordix Foundation.
+ Modifications Copyright (C) 2021 Bell Canada.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -52,11 +53,12 @@ It starts both Postgres database and CPS services.
1. Edit `docker-compose.yml`
1. uncomment desired service to be deployed, by default `cps-and-ncmp` is enabled. You can comment it and uncomment `cps-standalone` or `ncmp-standalone`.
- 2. To send data-updated events to kafka,
- * uncomment the `zookeeper` and `kafka` services.
- * uncomment environment variables
+ 2. To send data-updated events to kafka,
+ * uncomment the `zookeeper` and `kafka` services.
+ * uncomment environment variables
* `notification.data-updated.enabled: 'true'`
* `KAFKA_BOOTSTRAP_SERVER: kafka:9092`
+ * `NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'`
2. Execute following command from `docker-compose` folder:
Use one of the below version type that has been generated in the local system's docker image list after the build.
diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml
index e659b0879..d97cbeb84 100755
--- a/docker-compose/docker-compose.yml
+++ b/docker-compose/docker-compose.yml
@@ -33,6 +33,7 @@ services:
# DB_PASSWORD: ${DB_PASSWORD:-cps}
# #KAFKA_BOOTSTRAP_SERVER: kafka:9092
# #notification.data-updated.enabled: 'true'
+ # #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
# restart: unless-stopped
# depends_on:
# - dbpostgresql
@@ -53,6 +54,7 @@ services:
# DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
# #KAFKA_BOOTSTRAP_SERVER: kafka:9092
# #notification.data-updated.enabled: 'true'
+ # #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
# restart: unless-stopped
# depends_on:
# - dbpostgresql
@@ -73,6 +75,7 @@ services:
DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
#KAFKA_BOOTSTRAP_SERVER: kafka:9092
#notification.data-updated.enabled: 'true'
+ #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
restart: unless-stopped
depends_on:
- dbpostgresql