aboutsummaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org/onap
diff options
context:
space:
mode:
authordanielhanrahan <daniel.hanrahan@est.tech>2025-01-29 18:35:15 +0000
committerdanielhanrahan <daniel.hanrahan@est.tech>2025-02-04 10:24:58 +0000
commite08721866f0792ea830d05c415758ce98a7a9b14 (patch)
tree6fac984083dc3253c0d91cc3dc1f65d9c947fa37 /cps-ncmp-service/src/main/java/org/onap
parent59f1cc4c5994da34f2c48a2347499f9cfb7bd836 (diff)
Remove multithreading from module sync watchdog
After introduction of module set tag improvements, there is no need to multithreading in module sync. Performance impact is minimal. Issue-ID: CPS-2165 Signed-off-by: danielhanrahan <daniel.hanrahan@est.tech> Change-Id: I1557fc8348d39da3654a1b92944c6ad49fa8670d
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java77
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java11
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java38
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java35
4 files changed, 7 insertions, 154 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java
deleted file mode 100644
index 80bc4ab69f..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-
-import jakarta.annotation.PostConstruct;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-
-@Slf4j
-@Service
-public class AsyncTaskExecutor {
-
- @Value("${ncmp.modules-sync-watchdog.async-executor.parallelism-level:10}")
- @Getter
- private int asyncTaskParallelismLevel;
- private ExecutorService executorService;
- private static final int DEFAULT_PARALLELISM_LEVEL = 10;
-
- /**
- * Set up executor service with thread-pool size as per configuration parameter.
- * If modules-sync-watchdog.async-executor.parallelism-level not set a default of 10 threads will be applied.
- */
- @PostConstruct
- public void setupThreadPool() {
- executorService = Executors.newWorkStealingPool(
- asyncTaskParallelismLevel == 0 ? DEFAULT_PARALLELISM_LEVEL : asyncTaskParallelismLevel);
- }
-
- /**
- * Execute supplied task asynchronously.
- *
- * @param taskSupplier functional method is get() task need to executed asynchronously
- * @param timeOutInMillis the task timeout value in milliseconds
- */
- public void executeTask(final Supplier<Object> taskSupplier, final long timeOutInMillis) {
- CompletableFuture.supplyAsync(taskSupplier::get, executorService)
- .orTimeout(timeOutInMillis, MILLISECONDS)
- .whenCompleteAsync(this::handleTaskCompletion);
- }
-
- private void handleTaskCompletion(final Object response, final Throwable throwable) {
- if (throwable != null) {
- if (throwable instanceof TimeoutException) {
- log.error("Async task didn't complete within the required time.", throwable);
- } else {
- log.error("Watchdog async batch failed.", throwable);
- }
- }
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
index f039cf3c02..b727e79e70 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java
@@ -24,8 +24,6 @@ import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.exceptions.DataNodeNotFoundException;
@@ -51,12 +49,8 @@ public class ModuleSyncTasks {
* Perform module sync on a batch of cm handles.
*
* @param cmHandleIds a batch of cm handle ids to perform module sync on
- * @param batchCounter the number of batches currently being processed, will be decreased when
- * task is finished or fails
- * @return completed future to handle post-processing
*/
- public CompletableFuture<Void> performModuleSync(final Collection<String> cmHandleIds,
- final AtomicInteger batchCounter) {
+ public void performModuleSync(final Collection<String> cmHandleIds) {
final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(cmHandleIds.size());
try {
for (final String cmHandleId : cmHandleIds) {
@@ -74,11 +68,8 @@ public class ModuleSyncTasks {
}
}
} finally {
- batchCounter.getAndDecrement();
lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
- log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get());
}
- return CompletableFuture.completedFuture(null);
}
/**
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
index 32e1c49f17..6eefedb633 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -27,13 +27,9 @@ import com.hazelcast.map.IMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
-import org.onap.cps.ncmp.impl.utils.Sleeper;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -46,16 +42,10 @@ public class ModuleSyncWatchdog {
private final BlockingQueue<String> moduleSyncWorkQueue;
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
- private final AsyncTaskExecutor asyncTaskExecutor;
private final IMap<String, String> cpsAndNcmpLock;
- private final Sleeper sleeper;
private static final int MODULE_SYNC_BATCH_SIZE = 100;
- private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
- private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5);
- @Getter
- private AtomicInteger batchCounter = new AtomicInteger(1);
/**
* Check DB for any cm handles in 'ADVISED' state.
@@ -69,18 +59,11 @@ public class ModuleSyncWatchdog {
log.debug("Processing module sync watchdog waking up.");
populateWorkQueueIfNeeded();
while (!moduleSyncWorkQueue.isEmpty()) {
- if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) {
- final Collection<String> nextBatch = prepareNextBatch();
- log.info("Processing module sync batch of {}. {} batch(es) active.",
- nextBatch.size(), batchCounter.get());
- if (!nextBatch.isEmpty()) {
- asyncTaskExecutor.executeTask(() ->
- moduleSyncTasks.performModuleSync(nextBatch, batchCounter),
- ASYNC_TASK_TIMEOUT_IN_MILLISECONDS);
- batchCounter.getAndIncrement();
- }
- } else {
- preventBusyWait();
+ final Collection<String> nextBatch = prepareNextBatch();
+ if (!nextBatch.isEmpty()) {
+ log.info("Processing module sync batch of {}. 1 batch(es) active.", nextBatch.size());
+ moduleSyncTasks.performModuleSync(nextBatch);
+ log.info("Processing module sync batch finished. 0 batch(es) active.");
}
}
}
@@ -153,13 +136,4 @@ public class ModuleSyncWatchdog {
log.info("nextBatch size : {}", nextBatch.size());
return nextBatch;
}
-
- private void preventBusyWait() {
- try {
- log.debug("Busy waiting now");
- sleeper.haveALittleRest(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
deleted file mode 100644
index 7a02fa06e0..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.utils;
-
-import java.util.concurrent.TimeUnit;
-import org.springframework.stereotype.Service;
-
-/**
- * This class is to extract out sleep functionality so the interrupted exception handling can
- * be covered with a test (e.g. using spy on Sleeper) and help to get to 100% code coverage.
- */
-@Service
-public class Sleeper {
- public void haveALittleRest(final long timeInMillis) throws InterruptedException {
- TimeUnit.MILLISECONDS.sleep(timeInMillis);
- }
-}