diff options
author | Sourabh Sourabh <sourabh.sourabh@est.tech> | 2022-08-29 12:27:55 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2022-08-29 12:27:55 +0000 |
commit | bf1fdbdeed0ba6f0e8420d15ab33d6c88a5ee4d5 (patch) | |
tree | 292e45d1ae93cfcc4595bdf9251561226706d979 /cps-ncmp-service/src/main/java/org | |
parent | 6abd69ad61216db6c994bfec662ec46685b871c6 (diff) | |
parent | 6b2cdc63c02cc2d1a9bc9d43997ab14e740d7973 (diff) |
Merge "Performance Improvement: Use hazelcast blocking queue"
Diffstat (limited to 'cps-ncmp-service/src/main/java/org')
7 files changed, 299 insertions, 161 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java new file mode 100644 index 0000000000..abde4c2d54 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java @@ -0,0 +1,113 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.config.embeddedcache; + +import com.hazelcast.config.Config; +import com.hazelcast.config.MapConfig; +import com.hazelcast.config.NamedConfig; +import com.hazelcast.config.QueueConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import org.onap.cps.spi.model.DataNode; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * Core infrastructure of the hazelcast distributed caches for Module Sync and Data Sync use cases. + */ +@Configuration +public class SynchronizationCacheConfig { + + private static final QueueConfig commonQueueConfig = createQueueConfig(); + private static final MapConfig moduleSyncStartedConfig = + createMapConfig("moduleSyncStartedConfig", TimeUnit.MINUTES.toSeconds(1)); + private static final MapConfig dataSyncSemaphoresConfig = + createMapConfig("dataSyncSemaphoresConfig", TimeUnit.MINUTES.toSeconds(30)); + + /** + * Module Sync Distributed Queue Instance. + * + * @return queue of cm handles (data nodes) that need module sync + */ + @Bean + public BlockingQueue<DataNode> moduleSyncWorkQueue() { + return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig) + .getQueue("moduleSyncWorkQueue"); + } + + /** + * Module Sync started (and maybe finished) on cm handles (ids). + * + * @return Map of cm handles (ids) and objects (not used really) for which module sync has started or been completed + */ + @Bean + public Map<String, Object> moduleSyncStartedOnCmHandles() { + return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig) + .getMap("moduleSyncStartedOnCmHandles"); + } + + /** + * Data Sync Distributed Map Instance. + * + * @return configured map of data sync semaphores + */ + @Bean + public Map<String, Boolean> dataSyncSemaphores() { + return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig) + .getMap("dataSyncSemaphores"); + } + + private HazelcastInstance createHazelcastInstance( + final String hazelcastInstanceName, final NamedConfig namedConfig) { + return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig)); + } + + private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) { + final Config config = new Config(instanceName); + if (namedConfig instanceof MapConfig) { + config.addMapConfig((MapConfig) namedConfig); + } + if (namedConfig instanceof QueueConfig) { + config.addQueueConfig((QueueConfig) namedConfig); + } + config.setClusterName("synchronization-caches"); + return config; + } + + private static QueueConfig createQueueConfig() { + final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig"); + commonQueueConfig.setBackupCount(3); + commonQueueConfig.setAsyncBackupCount(3); + return commonQueueConfig; + } + + private static MapConfig createMapConfig(final String configName, final long timeToLiveSeconds) { + final MapConfig mapConfig = new MapConfig(configName); + mapConfig.setBackupCount(3); + mapConfig.setAsyncBackupCount(3); + mapConfig.setTimeToLiveSeconds((int) timeToLiveSeconds); + return mapConfig; + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java deleted file mode 100644 index 571558ac0c..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * ===========LICENSE_START======================================================== - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.config.embeddedcache; - -import com.hazelcast.config.Config; -import com.hazelcast.config.MapConfig; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * Core infrastructure of the hazelcast distributed map for Module Sync and Data Sync use cases. - */ -@Configuration -public class SynchronizationSemaphoresConfig { - - private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30); - - /** - * Module Sync Distributed Map Instance. - * - * @return configured map of module sync semaphores - */ - @Bean - public ConcurrentMap<String, Boolean> moduleSyncSemaphores() { - return createHazelcastInstance("moduleSyncSemaphores", "moduleSyncSemaphoresConfig") - .getMap("moduleSyncSemaphores"); - } - - /** - * Data Sync Distributed Map Instance. - * - * @return configured map of data sync semaphores - */ - @Bean - public ConcurrentMap<String, Boolean> dataSyncSemaphores() { - return createHazelcastInstance("dataSyncSemaphores", "dataSyncSemaphoresConfig") - .getMap("dataSyncSemaphores"); - } - - private HazelcastInstance createHazelcastInstance( - final String hazelcastInstanceName, final String configMapName) { - return Hazelcast.newHazelcastInstance( - initializeDefaultMapConfig(hazelcastInstanceName, configMapName)); - } - - private Config initializeDefaultMapConfig(final String instanceName, final String configName) { - final Config config = new Config(instanceName); - final MapConfig mapConfig = new MapConfig(configName); - mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS); - mapConfig.setBackupCount(3); - mapConfig.setAsyncBackupCount(3); - config.addMapConfig(mapConfig); - config.setClusterName("synchronization-semaphores"); - return config; - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java index 45ba078044..107f8a04bb 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java @@ -21,7 +21,7 @@ package org.onap.cps.ncmp.api.inventory.sync; import java.time.OffsetDateTime; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; import java.util.function.Consumer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -29,7 +29,6 @@ import org.onap.cps.api.CpsDataService; import org.onap.cps.ncmp.api.inventory.CompositeState; import org.onap.cps.ncmp.api.inventory.DataStoreSyncState; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -47,8 +46,7 @@ public class DataSyncWatchdog { private final SyncUtils syncUtils; - @Qualifier("dataSyncSemaphores") - private final ConcurrentMap<String, Boolean> dataSyncSemaphores; + private final Map<String, Boolean> dataSyncSemaphores; /** * Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java index c574aa61d9..7f61c476d5 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java @@ -86,19 +86,17 @@ public class ModuleSyncService { } /** - * Deletes the SchemaSet for provided cmHandle if the SchemaSet Exists. + * Deletes the SchemaSet for schema set id if the SchemaSet Exists. * - * @param yangModelCmHandle the yang model of cm handle. + * @param schemaSetId the schema set id to be deleted */ - public void deleteSchemaSetIfExists(final YangModelCmHandle yangModelCmHandle) { - final String schemaSetAndAnchorName = yangModelCmHandle.getId(); + public void deleteSchemaSetIfExists(final String schemaSetId) { try { - cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetAndAnchorName, + cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetId, CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED); - log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetAndAnchorName); + log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetId); } catch (final SchemaSetNotFoundException e) { - log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.", - schemaSetAndAnchorName); + log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.", schemaSetId); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java new file mode 100644 index 0000000000..5e26650eda --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java @@ -0,0 +1,113 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.inventory.sync; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler; +import org.onap.cps.ncmp.api.impl.utils.YangDataConverter; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.inventory.CmHandleState; +import org.onap.cps.ncmp.api.inventory.CompositeState; +import org.onap.cps.ncmp.api.inventory.InventoryPersistence; +import org.onap.cps.ncmp.api.inventory.LockReasonCategory; +import org.onap.cps.spi.model.DataNode; +import org.springframework.stereotype.Component; + +@RequiredArgsConstructor +@Component +@Slf4j +public class ModuleSyncTasks { + private final InventoryPersistence inventoryPersistence; + private final SyncUtils syncUtils; + private final ModuleSyncService moduleSyncService; + private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler; + + private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null); + + /** + * Perform module sync on a batch of cm handles. + * + * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on + * @return completed future to handle post-processing + */ + public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) { + final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>(); + for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) { + final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id")); + final YangModelCmHandle yangModelCmHandle = + YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId); + final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); + try { + moduleSyncService.deleteSchemaSetIfExists(cmHandleId); + moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); + cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY); + } catch (final Exception e) { + syncUtils.updateLockReasonDetailsAndAttempts(compositeState, + LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); + setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); + cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED); + } + log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); + } + updateCmHandlesStateBatch(cmHandelStatePerCmHandle); + return COMPLETED_FUTURE; + } + + /** + * Reset state to "ADVISED" for any previously failed cm handles. + * + * @param failedCmHandles previously failed (locked) cm handles + * @return completed future to handle post-processing + */ + public CompletableFuture<Void> resetFailedCmHandles(final List<YangModelCmHandle> failedCmHandles) { + final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size()); + for (final YangModelCmHandle failedCmHandle : failedCmHandles) { + final CompositeState compositeState = failedCmHandle.getCompositeState(); + final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState); + if (isReadyForRetry) { + log.debug("Reset cm handle {} state to ADVISED to be re-attempted by module-sync watchdog", + failedCmHandle.getId()); + cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED); + } + } + updateCmHandlesStateBatch(cmHandleStatePerCmHandle); + return COMPLETED_FUTURE; + } + + private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle, + final CompositeState.LockReason lockReason) { + advisedCmHandle.getCompositeState().setLockReason(lockReason); + } + + private void updateCmHandlesStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) { + // To be refactored as part of CPS-1231; Use state-save-batch capability (depends sub-task12, 13) + for (final Map.Entry<YangModelCmHandle, CmHandleState> entry : cmHandleStatePerCmHandle.entrySet()) { + lcmEventsCmHandleStateHandler.updateCmHandleState(entry.getKey(), entry.getValue()); + } + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index be811a1147..8074fe6fe1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -21,17 +21,16 @@ package org.onap.cps.ncmp.api.inventory.sync; +import java.util.Collection; +import java.util.HashSet; import java.util.List; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; -import org.onap.cps.ncmp.api.inventory.CmHandleState; -import org.onap.cps.ncmp.api.inventory.CompositeState; -import org.onap.cps.ncmp.api.inventory.InventoryPersistence; -import org.onap.cps.ncmp.api.inventory.LockReasonCategory; -import org.springframework.beans.factory.annotation.Qualifier; +import org.onap.cps.spi.model.DataNode; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -40,75 +39,75 @@ import org.springframework.stereotype.Component; @Component public class ModuleSyncWatchdog { - private static final boolean MODEL_SYNC_IN_PROGRESS = false; - private static final boolean MODEL_SYNC_DONE = true; - - private final InventoryPersistence inventoryPersistence; - private final SyncUtils syncUtils; + private final BlockingQueue<DataNode> moduleSyncWorkQueue; + private final Map<String, Object> moduleSyncStartedOnCmHandles; + private final ModuleSyncTasks moduleSyncTasks; - private final ModuleSyncService moduleSyncService; - - @Qualifier("moduleSyncSemaphores") - private final ConcurrentMap<String, Boolean> moduleSyncSemaphores; - - private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler; + private static final int MODULE_SYNC_BATCH_SIZE = 100; + private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10; + private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started"; /** * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'. */ - @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}") - public void executeAdvisedCmHandlePoll() { - syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> { - final String cmHandleId = advisedCmHandle.getId(); - if (hasPushedIntoSemaphoreMap(cmHandleId)) { - log.debug("executing module sync on {}", cmHandleId); - final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); - try { - moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle); - moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle); - lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.READY); - updateModuleSyncSemaphoreMap(cmHandleId); - } catch (final Exception e) { - syncUtils.updateLockReasonDetailsAndAttempts(compositeState, - LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage()); - setCmHandleStateLocked(advisedCmHandle, compositeState.getLockReason()); - } - log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name()); - } else { - log.debug("{} already processed by another instance", cmHandleId); - } - }); - log.debug("No Cm-Handles currently found in an ADVISED state"); + @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}") + public void moduleSyncAdvisedCmHandles() { + populateWorkQueueIfNeeded(); + while (!moduleSyncWorkQueue.isEmpty()) { + final Collection<DataNode> nextBatch = prepareNextBatch(); + moduleSyncTasks.performModuleSync(nextBatch); + preventBusyWait(); + } } /** - * Execute Cm Handle poll which changes the cm handle state from 'LOCKED' to 'ADVISED'. + * Find any failed (locked) cm handles and change state back to 'ADVISED'. */ @Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}") - public void executeLockedCmHandlePoll() { - final List<YangModelCmHandle> lockedCmHandles = syncUtils.getModuleSyncFailedCmHandles(); - for (final YangModelCmHandle lockedCmHandle : lockedCmHandles) { - final CompositeState compositeState = lockedCmHandle.getCompositeState(); - final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState); - if (isReadyForRetry) { - log.debug("Reset cm handle {} state to ADVISED to re-attempt module-sync", lockedCmHandle.getId()); - lcmEventsCmHandleStateHandler.updateCmHandleState(lockedCmHandle, CmHandleState.ADVISED); - } - } + public void resetPreviouslyFailedCmHandles() { + final List<YangModelCmHandle> failedCmHandles = syncUtils.getModuleSyncFailedCmHandles(); + moduleSyncTasks.resetFailedCmHandles(failedCmHandles); } - private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle, - final CompositeState.LockReason lockReason) { - advisedCmHandle.getCompositeState().setLockReason(lockReason); - lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.LOCKED); + private void preventBusyWait() { + // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution + // but leaving here to minimize impacts on this class for that Jira + try { + TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } } - private void updateModuleSyncSemaphoreMap(final String cmHandleId) { - moduleSyncSemaphores.replace(cmHandleId, MODEL_SYNC_DONE); + private void populateWorkQueueIfNeeded() { + if (moduleSyncWorkQueue.isEmpty()) { + final List<DataNode> advisedCmHandles = syncUtils.getAdvisedCmHandles(); + for (final DataNode advisedCmHandle : advisedCmHandles) { + if (!moduleSyncWorkQueue.offer(advisedCmHandle)) { + log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id")); + } + } + } } - private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) { - return moduleSyncSemaphores.putIfAbsent(cmHandleId, MODEL_SYNC_IN_PROGRESS) == null; + private Collection<DataNode> prepareNextBatch() { + final Collection<DataNode> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE); + final Collection<DataNode> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE); + moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE); + log.debug("nextBatchCandidates size : {}", nextBatchCandidates.size()); + for (final DataNode batchCandidate : nextBatchCandidates) { + final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id")); + final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP + .equals(moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP)); + if (alreadyAddedToInProgressMap) { + log.debug("module sync for {} already in progress by other instance", cmHandleId); + } else { + nextBatch.add(batchCandidate); + } + } + log.debug("nextBatch size : {}", nextBatch.size()); + return nextBatch; } + } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java index 16fb8f44d8..537f50122c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java @@ -64,19 +64,14 @@ public class SyncUtils { private static final Pattern retryAttemptPattern = Pattern.compile("^Attempt #(\\d+) failed:"); /** - * Query data nodes for cm handles with an "ADVISED" cm handle state, and select a random entry for processing. + * Query data nodes for cm handles with an "ADVISED" cm handle state. * - * @return a randomized yang model cm handle list with ADVISED state, return empty list if not found + * @return cm handles (data nodes) in ADVISED state (empty list if none found) */ - public List<YangModelCmHandle> getAdvisedCmHandles() { - final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>( - cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED)); - log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size()); - if (advisedCmHandlesAsDataNodeList.isEmpty()) { - return Collections.emptyList(); - } - Collections.shuffle(advisedCmHandlesAsDataNodeList); - return convertCmHandlesDataNodesToYangModelCmHandles(advisedCmHandlesAsDataNodeList); + public List<DataNode> getAdvisedCmHandles() { + final List<DataNode> advisedCmHandlesAsDataNodes = cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED); + log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodes.size()); + return advisedCmHandlesAsDataNodes; } /** |