diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java')
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java | 28 |
1 files changed, 20 insertions, 8 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index 8074fe6fe..73954c36b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -27,36 +27,50 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor; import org.onap.cps.spi.model.DataNode; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; @Slf4j @RequiredArgsConstructor -@Component +@Service public class ModuleSyncWatchdog { private final SyncUtils syncUtils; private final BlockingQueue<DataNode> moduleSyncWorkQueue; private final Map<String, Object> moduleSyncStartedOnCmHandles; private final ModuleSyncTasks moduleSyncTasks; - + private final AsyncTaskExecutor asyncTaskExecutor; private static final int MODULE_SYNC_BATCH_SIZE = 100; private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; + private static final long ASYNC_TASK_TIMEOUT_IN_MILLISECONDS = TimeUnit.MINUTES.toMillis(5); + @Getter + private AtomicInteger batchCounter = new AtomicInteger(1); /** - * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. + * Check DB for any cm handles in 'ADVISED' state. + * Queue and create batches to process them asynchronously. + * This method will only finish when there are no more 'ADVISED' cm handles in the DB. + * This method wil be triggered on a configurable interval */ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}") public void moduleSyncAdvisedCmHandles() { populateWorkQueueIfNeeded(); - while (!moduleSyncWorkQueue.isEmpty()) { + final int asyncTaskParallelismLevel = asyncTaskExecutor.getAsyncTaskParallelismLevel(); + while (!moduleSyncWorkQueue.isEmpty() && batchCounter.get() <= asyncTaskParallelismLevel) { + batchCounter.getAndIncrement(); final Collection<DataNode> nextBatch = prepareNextBatch(); - moduleSyncTasks.performModuleSync(nextBatch); + asyncTaskExecutor.executeTask(() -> + moduleSyncTasks.performModuleSync(nextBatch, batchCounter), + ASYNC_TASK_TIMEOUT_IN_MILLISECONDS + ); preventBusyWait(); } } @@ -71,8 +85,6 @@ public class ModuleSyncWatchdog { } private void preventBusyWait() { - // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution - // but leaving here to minimize impacts on this class for that Jira try { TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); } catch (final InterruptedException e) { |