diff options
author | 2025-01-29 18:35:15 +0000 | |
---|---|---|
committer | 2025-02-04 10:24:58 +0000 | |
commit | e08721866f0792ea830d05c415758ce98a7a9b14 (patch) | |
tree | 6fac984083dc3253c0d91cc3dc1f65d9c947fa37 /cps-ncmp-service/src/main/java/org/onap | |
parent | 59f1cc4c5994da34f2c48a2347499f9cfb7bd836 (diff) |
Remove multithreading from module sync watchdog
After introduction of module set tag improvements, there is no need
to multithreading in module sync. Performance impact is minimal.
Issue-ID: CPS-2165
Signed-off-by: danielhanrahan <daniel.hanrahan@est.tech>
Change-Id: I1557fc8348d39da3654a1b92944c6ad49fa8670d
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap')
4 files changed, 7 insertions, 154 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java deleted file mode 100644 index 80bc4ab69f..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022-2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.inventory.sync; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import jakarta.annotation.PostConstruct; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; -import java.util.function.Supplier; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; - -@Slf4j -@Service -public class AsyncTaskExecutor { - - @Value("${ncmp.modules-sync-watchdog.async-executor.parallelism-level:10}") - @Getter - private int asyncTaskParallelismLevel; - private ExecutorService executorService; - private static final int DEFAULT_PARALLELISM_LEVEL = 10; - - /** - * Set up executor service with thread-pool size as per configuration parameter. - * If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied. - */ - @PostConstruct - public void setupThreadPool() { - executorService = Executors.newWorkStealingPool( - asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel); - } - - /** - * Execute supplied task asynchronously. - * - * @param taskSupplier functional method is get() task need to executed asynchronously - * @param timeOutInMillis the task timeout value in milliseconds - */ - public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) { - CompletableFuture.supplyAsync(taskSupplier::get, executorService) - .orTimeout(timeOutInMillis, MILLISECONDS) - .whenCompleteAsync(this::handleTaskCompletion); - } - - private void handleTaskCompletion(final Object response, final Throwable throwable) { - if (throwable != null) { - if (throwable instanceof TimeoutException) { - log.error("Async task didn't complete within the required time.", throwable); - } else { - log.error("Watchdog async batch failed.", throwable); - } - } - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java index f039cf3c02..b727e79e70 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java @@ -24,8 +24,6 @@ import com.hazelcast.map.IMap; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.api.exceptions.DataNodeNotFoundException; @@ -51,12 +49,8 @@ public class ModuleSyncTasks { * Perform module sync on a batch of cm handles. * * @param cmHandleIds a batch of cm handle ids to perform module sync on - * @param batchCounter the number of batches currently being processed, will be decreased when - * task is finished or fails - * @return completed future to handle post-processing */ - public CompletableFuture<Void> performModuleSync(final Collection<String> cmHandleIds, - final AtomicInteger batchCounter) { + public void performModuleSync(final Collection<String> cmHandleIds) { final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(cmHandleIds.size()); try { for (final String cmHandleId : cmHandleIds) { @@ -74,11 +68,8 @@ public class ModuleSyncTasks { } } } finally { - batchCounter.getAndDecrement(); lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle); - log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get()); } - return CompletableFuture.completedFuture(null); } /** diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java index 32e1c49f17..6eefedb633 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2024 Nordix Foundation + * Copyright (C) 2022-2025 Nordix Foundation * Modifications Copyright (C) 2022 Bell Canada * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -27,13 +27,9 @@ import com.hazelcast.map.IMap; import java.util.Collection; import java.util.HashSet; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; -import org.onap.cps.ncmp.impl.utils.Sleeper; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -46,16 +42,10 @@ public class ModuleSyncWatchdog { private final BlockingQueue<String> moduleSyncWorkQueue; private final IMap<String, Object> moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; - private final AsyncTaskExecutor asyncTaskExecutor; private final IMap<String, String> cpsAndNcmpLock; - private final Sleeper sleeper; private static final int MODULE_SYNC_BATCH_SIZE = 100; - private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; - private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5); - @Getter - private AtomicInteger batchCounter = new AtomicInteger(1); /** * Check DB for any cm handles in 'ADVISED' state. @@ -69,18 +59,11 @@ public class ModuleSyncWatchdog { log.debug("Processing module sync watchdog waking up."); populateWorkQueueIfNeeded(); while (!moduleSyncWorkQueue.isEmpty()) { - if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) { - final Collection<String> nextBatch = prepareNextBatch(); - log.info("Processing module sync batch of {}. {} batch(es) active.", - nextBatch.size(), batchCounter.get()); - if (!nextBatch.isEmpty()) { - asyncTaskExecutor.executeTask(() -> - moduleSyncTasks.performModuleSync(nextBatch, batchCounter), - ASYNC_TASK_TIMEOUT_IN_MILLISECONDS); - batchCounter.getAndIncrement(); - } - } else { - preventBusyWait(); + final Collection<String> nextBatch = prepareNextBatch(); + if (!nextBatch.isEmpty()) { + log.info("Processing module sync batch of {}. 1 batch(es) active.", nextBatch.size()); + moduleSyncTasks.performModuleSync(nextBatch); + log.info("Processing module sync batch finished. 0 batch(es) active."); } } } @@ -153,13 +136,4 @@ public class ModuleSyncWatchdog { log.info("nextBatch size : {}", nextBatch.size()); return nextBatch; } - - private void preventBusyWait() { - try { - log.debug("Busy waiting now"); - sleeper.haveALittleRest(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } - } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java deleted file mode 100644 index 7a02fa06e0..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.utils; - -import java.util.concurrent.TimeUnit; -import org.springframework.stereotype.Service; - -/** - * This class is to extract out sleep functionality so the interrupted exception handling can - * be covered with a test (e.g. using spy on Sleeper) and help to get to 100% code coverage. - */ -@Service -public class Sleeper { - public void haveALittleRest(final long timeInMillis) throws InterruptedException { - TimeUnit.MILLISECONDS.sleep(timeInMillis); - } -} |