diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java/org')
3 files changed, 123 insertions, 26 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java index 5e26650eda..597e2ba8e5 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler; @@ -52,28 +53,35 @@ public class ModuleSyncTasks { * Perform module sync on a batch of cm handles. * * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on + * @param batchCounter the number of batches currently being processed, will be decreased when task is finished + * or fails * @return completed future to handle post-processing */ - public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) { - final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>(); - for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) { - final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id")); - final YangModelCmHandle yangModelCmHandle = - YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId); - final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); - try { - moduleSyncService.deleteSchemaSetIfExists(cmHandleId); - moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); - cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY); - } catch (final Exception e) { - syncUtils.updateLockReasonDetailsAndAttempts(compositeState, - LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); - setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); - cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED); + public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes, + final AtomicInteger batchCounter) { + try { + final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>(); + for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) { + final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id")); + final YangModelCmHandle yangModelCmHandle = + YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId); + final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); + try { + moduleSyncService.deleteSchemaSetIfExists(cmHandleId); + moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); + cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY); + } catch (final Exception e) { + syncUtils.updateLockReasonDetailsAndAttempts(compositeState, + LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); + cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED); + } + log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); } - log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); + updateCmHandlesStateBatch(cmHandelStatePerCmHandle); + } finally { + batchCounter.getAndDecrement(); } - updateCmHandlesStateBatch(cmHandelStatePerCmHandle); return COMPLETED_FUTURE; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index 8074fe6fe1..73954c36b3 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -27,36 +27,50 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor; import org.onap.cps.spi.model.DataNode; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; @Slf4j @RequiredArgsConstructor -@Component +@Service public class ModuleSyncWatchdog { private final SyncUtils syncUtils; private final BlockingQueue<DataNode> moduleSyncWorkQueue; private final Map<String, Object> moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; - + private final AsyncTaskExecutor asyncTaskExecutor; private static final int MODULE_SYNC_BATCH_SIZE = 100; private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; + private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5); + @Getter + private AtomicInteger batchCounter = new AtomicInteger(1); /** - * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. + * Check DB for any cm handles in 'ADVISED' state. + * Queue and create batches to process them asynchronously. + * This method will only finish when there are no more 'ADVISED' cm handles in the DB. + * This method wil be triggered on a configurable interval */ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}") public void moduleSyncAdvisedCmHandles() { populateWorkQueueIfNeeded(); - while (!moduleSyncWorkQueue.isEmpty()) { + final int asyncTaskParallelismLevel = asyncTaskExecutor.getAsyncTaskParallelismLevel(); + while (!moduleSyncWorkQueue.isEmpty() && batchCounter.get() <= asyncTaskParallelismLevel) { + batchCounter.getAndIncrement(); final Collection<DataNode> nextBatch = prepareNextBatch(); - moduleSyncTasks.performModuleSync(nextBatch); + asyncTaskExecutor.executeTask(() -> + moduleSyncTasks.performModuleSync(nextBatch, batchCounter), + ASYNC_TASK_TIMEOUT_IN_MILLISECONDS + ); preventBusyWait(); } } @@ -71,8 +85,6 @@ public class ModuleSyncWatchdog { } private void preventBusyWait() { - // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution - // but leaving here to minimize impacts on this class for that Jira try { TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); } catch (final InterruptedException e) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java new file mode 100644 index 0000000000..7b4d2cfaa9 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutor.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync.executor; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import javax.annotation.PostConstruct; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class AsyncTaskExecutor { + + @Value("${modules-sync-watchdog.async-executor.parallelism-level:10}") + @Getter + private int asyncTaskParallelismLevel; + private ExecutorService executorService; + private static final int DEFAULT_PARALLELISM_LEVEL = 10; + + /** + * Set up executor service with thread-pool size as per configuration parameter. + * If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied. + */ + @PostConstruct + public void setupThreadPool() { + executorService = Executors.newWorkStealingPool( + asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel); + } + + /** + * Execute supplied task asynchronously. + * + * @param taskSupplier functional method is get() task need to executed asynchronously + * @param timeOutInMillis the task timeout value in milliseconds + */ + public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) { + CompletableFuture.supplyAsync(taskSupplier::get, executorService) + .orTimeout(timeOutInMillis, MILLISECONDS) + .whenCompleteAsync(this::handleTaskCompletion); + } + + private void handleTaskCompletion(final Object response, final Throwable throwable) { + if (throwable != null) { + if (throwable instanceof TimeoutException) { + log.warn("Async task didn't completed within the required time."); + } else { + log.debug("Watchdog async batch failed. caused by : {}", throwable.getMessage()); + } + } + } +} |