aboutsummaryrefslogtreecommitdiffstats
path: root/cps-service/src/main
diff options
context:
space:
mode:
authorRenu Kumari <renu.kumari@bell.ca>2021-08-19 13:11:00 -0400
committerRenu Kumari <renu.kumari@bell.ca>2021-08-24 09:39:45 -0400
commit86c74c79cb45992d9f2ec134477cae41cd26651b (patch)
tree4bc17c8dca351910f0f411a6007f0db6924af5ab /cps-service/src/main
parent888dcd495ecb63bf678e7234e9dc34e0743cb412 (diff)
Process data-updated event asynchronously
- notification is processed asynchronously using defined threadpool - updated docker-compose and readme to add dataspace filtering variables Issue-ID: CPS-526 Signed-off-by: Renu Kumari <renu.kumari@bell.ca> Change-Id: I7f827250f45cb9e3db2f060e9b3a089a4eaee05c
Diffstat (limited to 'cps-service/src/main')
-rwxr-xr-xcps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java24
-rw-r--r--cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java65
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java17
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java7
-rw-r--r--cps-service/src/main/java/org/onap/cps/notification/NotificationService.java14
5 files changed, 109 insertions, 18 deletions
diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
index 5e6e1a2687..a512f67baf 100755
--- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
+++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
@@ -23,6 +23,7 @@
package org.onap.cps.api.impl;
import java.util.Collection;
+import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.CpsAdminService;
import org.onap.cps.api.CpsDataService;
import org.onap.cps.api.CpsModuleService;
@@ -39,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
+@Slf4j
public class CpsDataServiceImpl implements CpsDataService {
private static final String ROOT_NODE_XPATH = "/";
@@ -62,7 +64,7 @@ public class CpsDataServiceImpl implements CpsDataService {
public void saveData(final String dataspaceName, final String anchorName, final String jsonData) {
final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, ROOT_NODE_XPATH, jsonData);
cpsDataPersistenceService.storeDataNode(dataspaceName, anchorName, dataNode);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
@@ -70,7 +72,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final String jsonData) {
final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData);
cpsDataPersistenceService.addChildDataNode(dataspaceName, anchorName, parentNodeXpath, dataNode);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
@@ -79,7 +81,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final Collection<DataNode> dataNodesCollection =
buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData);
cpsDataPersistenceService.addListDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodesCollection);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
@@ -94,7 +96,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData);
cpsDataPersistenceService
.updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves());
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
@@ -102,7 +104,7 @@ public class CpsDataServiceImpl implements CpsDataService {
final String jsonData) {
final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData);
cpsDataPersistenceService.replaceDataNodeTree(dataspaceName, anchorName, dataNode);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
@@ -111,13 +113,13 @@ public class CpsDataServiceImpl implements CpsDataService {
final Collection<DataNode> dataNodes =
buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData);
cpsDataPersistenceService.replaceListDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@Override
public void deleteListNodeData(final String dataspaceName, final String anchorName, final String listNodeXpath) {
cpsDataPersistenceService.deleteListDataNodes(dataspaceName, anchorName, listNodeXpath);
- notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ processDataUpdatedEventAsync(dataspaceName, anchorName);
}
@@ -157,6 +159,14 @@ public class CpsDataServiceImpl implements CpsDataService {
}
+ private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName) {
+ try {
+ notificationService.processDataUpdatedEvent(dataspaceName, anchorName);
+ } catch (final Exception exception) {
+ log.error("Failed to send message to notification service", exception);
+ }
+ }
+
private SchemaContext getSchemaContext(final String dataspaceName, final String schemaSetName) {
return yangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName).getSchemaContext();
}
diff --git a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
new file mode 100644
index 0000000000..3ae675e214
--- /dev/null
+++ b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2021 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.config;
+
+import javax.validation.constraints.Min;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.validation.annotation.Validated;
+
+@EnableAsync
+@Configuration
+@ConfigurationProperties("notification.async-executor")
+@Validated
+@Setter
+public class AsyncConfig {
+
+ @Min(0)
+ private int corePoolSize = 2;
+ @Min(2)
+ private int maxPoolSize = 10;
+ @Min(0)
+ private int queueCapacity = 2147483647;
+ private boolean waitForTasksToCompleteOnShutdown = true;
+ private String threadNamePrefix = "Async-";
+
+ /**
+ * Creates TaskExecutor for processing data-updated events.
+ *
+ * @return TaskExecutor
+ */
+ @Bean("notificationExecutor")
+ public TaskExecutor getThreadAsyncExecutorForNotification() {
+ final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(corePoolSize);
+ executor.setMaxPoolSize(maxPoolSize);
+ executor.setQueueCapacity(queueCapacity);
+ executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
+ executor.setThreadNamePrefix(threadNamePrefix);
+ return executor;
+ }
+
+}
diff --git a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java
index e1473d3ff9..6b7f5a89be 100644
--- a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java
+++ b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java
@@ -6,13 +6,15 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -36,14 +38,14 @@ import org.onap.cps.utils.DataMapUtils;
import org.springframework.stereotype.Component;
@Component
-class CpsDataUpdatedEventFactory {
+public class CpsDataUpdatedEventFactory {
private static final URI EVENT_SOURCE;
private static final String EVENT_TYPE = "org.onap.cps.data-updated-event";
private static final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
- static {
+ static {
try {
EVENT_SOURCE = new URI("urn:cps:org.onap.cps");
} catch (final URISyntaxException e) {
@@ -60,7 +62,14 @@ class CpsDataUpdatedEventFactory {
this.cpsAdminService = cpsAdminService;
}
- CpsDataUpdatedEvent createCpsDataUpdatedEvent(final String dataspaceName, final String anchorName) {
+ /**
+ * Generates CPS Data Updated event.
+ *
+ * @param dataspaceName dataspaceName
+ * @param anchorName anchorName
+ * @return CpsDataUpdatedEvent
+ */
+ public CpsDataUpdatedEvent createCpsDataUpdatedEvent(final String dataspaceName, final String anchorName) {
final var dataNode = cpsDataService
.getDataNode(dataspaceName, anchorName, "/", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
final var anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java
index 1ab032b57c..2d87488245 100644
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java
+++ b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java
@@ -1,12 +1,13 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Bell Canada. All rights reserved.
+ * Copyright (c) 2021 Bell Canada.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,7 +55,7 @@ public class NotificationPublisher {
*
* @param cpsDataUpdatedEvent event to be sent to kafka
*/
- void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
+ public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) {
final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + ","
+ cpsDataUpdatedEvent.getContent().getAnchorName();
log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ",
diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java
index 9cb2c52e01..9ed8b65ad7 100644
--- a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java
+++ b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java
@@ -6,13 +6,15 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -21,10 +23,12 @@ package org.onap.cps.notification;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
@@ -45,12 +49,12 @@ public class NotificationService {
* @param cpsDataUpdatedEventFactory to create CPSDataUpdatedEvent
* @param notificationErrorHandler error handler
*/
- @Autowired
public NotificationService(
final NotificationProperties notificationProperties,
final NotificationPublisher notificationPublisher,
final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory,
final NotificationErrorHandler notificationErrorHandler) {
+ log.info("Notification Properties {}", notificationProperties);
this.notificationProperties = notificationProperties;
this.notificationPublisher = notificationPublisher;
this.cpsDataUpdatedEventFactory = cpsDataUpdatedEventFactory;
@@ -76,7 +80,8 @@ public class NotificationService {
* @param dataspaceName dataspace name
* @param anchorName anchor name
*/
- public void processDataUpdatedEvent(final String dataspaceName, final String anchorName) {
+ @Async("notificationExecutor")
+ public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName) {
log.debug("process data updated event for dataspace '{}' & anchor '{}'", dataspaceName, anchorName);
try {
if (shouldSendNotification(dataspaceName)) {
@@ -92,6 +97,7 @@ public class NotificationService {
notificationErrorHandler.onException("Failed to process cps-data-updated-event.",
exception, dataspaceName, anchorName);
}
+ return CompletableFuture.completedFuture(null);
}
/*