diff options
author | Renu Kumari <renu.kumari@bell.ca> | 2021-08-19 13:11:00 -0400 |
---|---|---|
committer | Renu Kumari <renu.kumari@bell.ca> | 2021-08-24 09:39:45 -0400 |
commit | 86c74c79cb45992d9f2ec134477cae41cd26651b (patch) | |
tree | 4bc17c8dca351910f0f411a6007f0db6924af5ab /cps-service | |
parent | 888dcd495ecb63bf678e7234e9dc34e0743cb412 (diff) |
Process data-updated event asynchronously
- notification is processed asynchronously using defined threadpool
- updated docker-compose and readme to add dataspace filtering variables
Issue-ID: CPS-526
Signed-off-by: Renu Kumari <renu.kumari@bell.ca>
Change-Id: I7f827250f45cb9e3db2f060e9b3a089a4eaee05c
Diffstat (limited to 'cps-service')
7 files changed, 138 insertions, 32 deletions
diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java index 5e6e1a2687..a512f67baf 100755 --- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java +++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java @@ -23,6 +23,7 @@ package org.onap.cps.api.impl; import java.util.Collection; +import lombok.extern.slf4j.Slf4j; import org.onap.cps.api.CpsAdminService; import org.onap.cps.api.CpsDataService; import org.onap.cps.api.CpsModuleService; @@ -39,6 +40,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service +@Slf4j public class CpsDataServiceImpl implements CpsDataService { private static final String ROOT_NODE_XPATH = "/"; @@ -62,7 +64,7 @@ public class CpsDataServiceImpl implements CpsDataService { public void saveData(final String dataspaceName, final String anchorName, final String jsonData) { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, ROOT_NODE_XPATH, jsonData); cpsDataPersistenceService.storeDataNode(dataspaceName, anchorName, dataNode); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override @@ -70,7 +72,7 @@ public class CpsDataServiceImpl implements CpsDataService { final String jsonData) { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.addChildDataNode(dataspaceName, anchorName, parentNodeXpath, dataNode); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override @@ -79,7 +81,7 @@ public class CpsDataServiceImpl implements CpsDataService { final Collection<DataNode> dataNodesCollection = buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.addListDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodesCollection); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override @@ -94,7 +96,7 @@ public class CpsDataServiceImpl implements CpsDataService { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService .updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves()); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override @@ -102,7 +104,7 @@ public class CpsDataServiceImpl implements CpsDataService { final String jsonData) { final var dataNode = buildDataNodeFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.replaceDataNodeTree(dataspaceName, anchorName, dataNode); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override @@ -111,13 +113,13 @@ public class CpsDataServiceImpl implements CpsDataService { final Collection<DataNode> dataNodes = buildDataNodeCollectionFromJson(dataspaceName, anchorName, parentNodeXpath, jsonData); cpsDataPersistenceService.replaceListDataNodes(dataspaceName, anchorName, parentNodeXpath, dataNodes); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @Override public void deleteListNodeData(final String dataspaceName, final String anchorName, final String listNodeXpath) { cpsDataPersistenceService.deleteListDataNodes(dataspaceName, anchorName, listNodeXpath); - notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + processDataUpdatedEventAsync(dataspaceName, anchorName); } @@ -157,6 +159,14 @@ public class CpsDataServiceImpl implements CpsDataService { } + private void processDataUpdatedEventAsync(final String dataspaceName, final String anchorName) { + try { + notificationService.processDataUpdatedEvent(dataspaceName, anchorName); + } catch (final Exception exception) { + log.error("Failed to send message to notification service", exception); + } + } + private SchemaContext getSchemaContext(final String dataspaceName, final String schemaSetName) { return yangTextSchemaSourceSetCache.get(dataspaceName, schemaSetName).getSchemaContext(); } diff --git a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java new file mode 100644 index 0000000000..3ae675e214 --- /dev/null +++ b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2021 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.config; + +import javax.validation.constraints.Min; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.validation.annotation.Validated; + +@EnableAsync +@Configuration +@ConfigurationProperties("notification.async-executor") +@Validated +@Setter +public class AsyncConfig { + + @Min(0) + private int corePoolSize = 2; + @Min(2) + private int maxPoolSize = 10; + @Min(0) + private int queueCapacity = 2147483647; + private boolean waitForTasksToCompleteOnShutdown = true; + private String threadNamePrefix = "Async-"; + + /** + * Creates TaskExecutor for processing data-updated events. + * + * @return TaskExecutor + */ + @Bean("notificationExecutor") + public TaskExecutor getThreadAsyncExecutorForNotification() { + final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(corePoolSize); + executor.setMaxPoolSize(maxPoolSize); + executor.setQueueCapacity(queueCapacity); + executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown); + executor.setThreadNamePrefix(threadNamePrefix); + return executor; + } + +} diff --git a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java index e1473d3ff9..6b7f5a89be 100644 --- a/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java +++ b/cps-service/src/main/java/org/onap/cps/notification/CpsDataUpdatedEventFactory.java @@ -6,13 +6,15 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -36,14 +38,14 @@ import org.onap.cps.utils.DataMapUtils; import org.springframework.stereotype.Component; @Component -class CpsDataUpdatedEventFactory { +public class CpsDataUpdatedEventFactory { private static final URI EVENT_SOURCE; private static final String EVENT_TYPE = "org.onap.cps.data-updated-event"; private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - static { + static { try { EVENT_SOURCE = new URI("urn:cps:org.onap.cps"); } catch (final URISyntaxException e) { @@ -60,7 +62,14 @@ class CpsDataUpdatedEventFactory { this.cpsAdminService = cpsAdminService; } - CpsDataUpdatedEvent createCpsDataUpdatedEvent(final String dataspaceName, final String anchorName) { + /** + * Generates CPS Data Updated event. + * + * @param dataspaceName dataspaceName + * @param anchorName anchorName + * @return CpsDataUpdatedEvent + */ + public CpsDataUpdatedEvent createCpsDataUpdatedEvent(final String dataspaceName, final String anchorName) { final var dataNode = cpsDataService .getDataNode(dataspaceName, anchorName, "/", FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); final var anchor = cpsAdminService.getAnchor(dataspaceName, anchorName); diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java index 1ab032b57c..2d87488245 100644 --- a/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java +++ b/cps-service/src/main/java/org/onap/cps/notification/NotificationPublisher.java @@ -1,12 +1,13 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021 Bell Canada. All rights reserved. + * Copyright (c) 2021 Bell Canada. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -54,7 +55,7 @@ public class NotificationPublisher { * * @param cpsDataUpdatedEvent event to be sent to kafka */ - void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) { + public void sendNotification(@NonNull final CpsDataUpdatedEvent cpsDataUpdatedEvent) { final var messageKey = cpsDataUpdatedEvent.getContent().getDataspaceName() + "," + cpsDataUpdatedEvent.getContent().getAnchorName(); log.debug("Data Updated event is being sent with messageKey: '{}' & body : {} ", diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java index 9cb2c52e01..9ed8b65ad7 100644 --- a/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java +++ b/cps-service/src/main/java/org/onap/cps/notification/NotificationService.java @@ -6,13 +6,15 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ @@ -21,10 +23,12 @@ package org.onap.cps.notification; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service @@ -45,12 +49,12 @@ public class NotificationService { * @param cpsDataUpdatedEventFactory to create CPSDataUpdatedEvent * @param notificationErrorHandler error handler */ - @Autowired public NotificationService( final NotificationProperties notificationProperties, final NotificationPublisher notificationPublisher, final CpsDataUpdatedEventFactory cpsDataUpdatedEventFactory, final NotificationErrorHandler notificationErrorHandler) { + log.info("Notification Properties {}", notificationProperties); this.notificationProperties = notificationProperties; this.notificationPublisher = notificationPublisher; this.cpsDataUpdatedEventFactory = cpsDataUpdatedEventFactory; @@ -76,7 +80,8 @@ public class NotificationService { * @param dataspaceName dataspace name * @param anchorName anchor name */ - public void processDataUpdatedEvent(final String dataspaceName, final String anchorName) { + @Async("notificationExecutor") + public Future<Void> processDataUpdatedEvent(final String dataspaceName, final String anchorName) { log.debug("process data updated event for dataspace '{}' & anchor '{}'", dataspaceName, anchorName); try { if (shouldSendNotification(dataspaceName)) { @@ -92,6 +97,7 @@ public class NotificationService { notificationErrorHandler.onException("Failed to process cps-data-updated-event.", exception, dataspaceName, anchorName); } + return CompletableFuture.completedFuture(null); } /* diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy index b60d093236..0a2d3d9988 100644 --- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy @@ -1,12 +1,13 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021 Bell Canada. All rights reserved. + * Copyright (c) 2021 Bell Canada. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,30 +20,35 @@ package org.onap.cps.notification +import org.onap.cps.config.AsyncConfig import org.onap.cps.event.model.CpsDataUpdatedEvent import org.spockframework.spring.SpringBean +import org.spockframework.spring.SpringSpy import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.boot.test.context.SpringBootTest +import org.springframework.scheduling.annotation.EnableAsync import org.springframework.test.context.ContextConfiguration import spock.lang.Shared import spock.lang.Specification @SpringBootTest +@EnableAsync @EnableConfigurationProperties -@ContextConfiguration(classes = [NotificationProperties]) +@ContextConfiguration(classes = [NotificationProperties, NotificationService, NotificationErrorHandler, AsyncConfig]) class NotificationServiceSpec extends Specification { @SpringBean NotificationPublisher mockNotificationPublisher = Mock() @SpringBean - NotificationErrorHandler spyNotificationErrorHandler = Spy(new NotificationErrorHandler()) - @SpringBean CpsDataUpdatedEventFactory mockCpsDataUpdatedEventFactory = Mock() + @SpringSpy + NotificationErrorHandler spyNotificationErrorHandler + @SpringSpy + NotificationProperties spyNotificationProperties @Autowired - NotificationProperties notificationProperties - NotificationProperties spyNotificationProperties + NotificationService objectUnderTest @Shared def myDataspacePublishedName = 'my-dataspace-published' @@ -50,7 +56,7 @@ class NotificationServiceSpec extends Specification { def 'Skip sending notification when disabled.'() { given: 'notification is disabled' - def objectUnderTest = createNotificationService(false) + spyNotificationProperties.isEnabled() >> false when: 'dataUpdatedEvent is received' objectUnderTest.processDataUpdatedEvent(myDataspacePublishedName, myAnchorName) then: 'the notification is not sent' @@ -59,12 +65,14 @@ class NotificationServiceSpec extends Specification { def 'Send notification when enabled: #scenario.'() { given: 'notification is enabled' - def objectUnderTest = createNotificationService(true) + spyNotificationProperties.isEnabled() >> true and: 'event factory can create event successfully' def cpsDataUpdatedEvent = new CpsDataUpdatedEvent() mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(dataspaceName, myAnchorName) >> cpsDataUpdatedEvent when: 'dataUpdatedEvent is received' - objectUnderTest.processDataUpdatedEvent(dataspaceName, myAnchorName) + def future = objectUnderTest.processDataUpdatedEvent(dataspaceName, myAnchorName) + and: 'async processing is completed' + future.get() then: 'notification is sent' expectedSendNotificationCount * mockNotificationPublisher.sendNotification(cpsDataUpdatedEvent) where: @@ -75,12 +83,14 @@ class NotificationServiceSpec extends Specification { def 'Error handling in notification service.'() { given: 'notification is enabled' - def objectUnderTest = createNotificationService(true) + spyNotificationProperties.isEnabled() >> true and: 'event factory can not create event successfully' mockCpsDataUpdatedEventFactory.createCpsDataUpdatedEvent(myDataspacePublishedName, myAnchorName) >> { throw new Exception("Could not create event") } when: 'event is sent for processing' - objectUnderTest.processDataUpdatedEvent(myDataspacePublishedName, myAnchorName) + def future = objectUnderTest.processDataUpdatedEvent(myDataspacePublishedName, myAnchorName) + and: 'async processing is completed' + future.get() then: 'error is handled and not thrown to caller' notThrown Exception 1 * spyNotificationErrorHandler.onException(_, _, _, _) diff --git a/cps-service/src/test/resources/application.yml b/cps-service/src/test/resources/application.yml index 94f7e818f4..b1546d74f3 100644 --- a/cps-service/src/test/resources/application.yml +++ b/cps-service/src/test/resources/application.yml @@ -1,11 +1,12 @@ # ============LICENSE_START======================================================= -# Copyright (C) 2021 Bell Canada. All rights reserved. +# Copyright (c) 2021 Bell Canada. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,6 +22,10 @@ notification: enabled-dataspaces: ".*-published,.*-important" enabled: true topic: cps-event + async-executor: + core-pool-size: 2 + max-pool-size: 10 + queue-capacity: 0 spring: kafka: |