summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main/java/org/onap
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main/java/org/onap')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java113
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java78
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java6
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java14
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java113
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java119
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java17
7 files changed, 299 insertions, 161 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
new file mode 100644
index 0000000000..abde4c2d54
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
@@ -0,0 +1,113 @@
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.config.embeddedcache;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.NamedConfig;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.onap.cps.spi.model.DataNode;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Core infrastructure of the hazelcast distributed caches for Module Sync and Data Sync use cases.
+ */
+@Configuration
+public class SynchronizationCacheConfig {
+
+ private static final QueueConfig commonQueueConfig = createQueueConfig();
+ private static final MapConfig moduleSyncStartedConfig =
+ createMapConfig("moduleSyncStartedConfig", TimeUnit.MINUTES.toSeconds(1));
+ private static final MapConfig dataSyncSemaphoresConfig =
+ createMapConfig("dataSyncSemaphoresConfig", TimeUnit.MINUTES.toSeconds(30));
+
+ /**
+ * Module Sync Distributed Queue Instance.
+ *
+ * @return queue of cm handles (data nodes) that need module sync
+ */
+ @Bean
+ public BlockingQueue<DataNode> moduleSyncWorkQueue() {
+ return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig)
+ .getQueue("moduleSyncWorkQueue");
+ }
+
+ /**
+ * Module Sync started (and maybe finished) on cm handles (ids).
+ *
+ * @return Map of cm handles (ids) and objects (not used really) for which module sync has started or been completed
+ */
+ @Bean
+ public Map<String, Object> moduleSyncStartedOnCmHandles() {
+ return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig)
+ .getMap("moduleSyncStartedOnCmHandles");
+ }
+
+ /**
+ * Data Sync Distributed Map Instance.
+ *
+ * @return configured map of data sync semaphores
+ */
+ @Bean
+ public Map<String, Boolean> dataSyncSemaphores() {
+ return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig)
+ .getMap("dataSyncSemaphores");
+ }
+
+ private HazelcastInstance createHazelcastInstance(
+ final String hazelcastInstanceName, final NamedConfig namedConfig) {
+ return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig));
+ }
+
+ private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) {
+ final Config config = new Config(instanceName);
+ if (namedConfig instanceof MapConfig) {
+ config.addMapConfig((MapConfig) namedConfig);
+ }
+ if (namedConfig instanceof QueueConfig) {
+ config.addQueueConfig((QueueConfig) namedConfig);
+ }
+ config.setClusterName("synchronization-caches");
+ return config;
+ }
+
+ private static QueueConfig createQueueConfig() {
+ final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig");
+ commonQueueConfig.setBackupCount(3);
+ commonQueueConfig.setAsyncBackupCount(3);
+ return commonQueueConfig;
+ }
+
+ private static MapConfig createMapConfig(final String configName, final long timeToLiveSeconds) {
+ final MapConfig mapConfig = new MapConfig(configName);
+ mapConfig.setBackupCount(3);
+ mapConfig.setAsyncBackupCount(3);
+ mapConfig.setTimeToLiveSeconds((int) timeToLiveSeconds);
+ return mapConfig;
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
deleted file mode 100644
index 571558ac0c..0000000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * ===========LICENSE_START========================================================
- * Copyright (C) 2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.config.embeddedcache;
-
-import com.hazelcast.config.Config;
-import com.hazelcast.config.MapConfig;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * Core infrastructure of the hazelcast distributed map for Module Sync and Data Sync use cases.
- */
-@Configuration
-public class SynchronizationSemaphoresConfig {
-
- private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30);
-
- /**
- * Module Sync Distributed Map Instance.
- *
- * @return configured map of module sync semaphores
- */
- @Bean
- public ConcurrentMap<String, Boolean> moduleSyncSemaphores() {
- return createHazelcastInstance("moduleSyncSemaphores", "moduleSyncSemaphoresConfig")
- .getMap("moduleSyncSemaphores");
- }
-
- /**
- * Data Sync Distributed Map Instance.
- *
- * @return configured map of data sync semaphores
- */
- @Bean
- public ConcurrentMap<String, Boolean> dataSyncSemaphores() {
- return createHazelcastInstance("dataSyncSemaphores", "dataSyncSemaphoresConfig")
- .getMap("dataSyncSemaphores");
- }
-
- private HazelcastInstance createHazelcastInstance(
- final String hazelcastInstanceName, final String configMapName) {
- return Hazelcast.newHazelcastInstance(
- initializeDefaultMapConfig(hazelcastInstanceName, configMapName));
- }
-
- private Config initializeDefaultMapConfig(final String instanceName, final String configName) {
- final Config config = new Config(instanceName);
- final MapConfig mapConfig = new MapConfig(configName);
- mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS);
- mapConfig.setBackupCount(3);
- mapConfig.setAsyncBackupCount(3);
- config.addMapConfig(mapConfig);
- config.setClusterName("synchronization-semaphores");
- return config;
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
index 45ba078044..107f8a04bb 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
@@ -21,7 +21,7 @@
package org.onap.cps.ncmp.api.inventory.sync;
import java.time.OffsetDateTime;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -29,7 +29,6 @@ import org.onap.cps.api.CpsDataService;
import org.onap.cps.ncmp.api.inventory.CompositeState;
import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -47,8 +46,7 @@ public class DataSyncWatchdog {
private final SyncUtils syncUtils;
- @Qualifier("dataSyncSemaphores")
- private final ConcurrentMap<String, Boolean> dataSyncSemaphores;
+ private final Map<String, Boolean> dataSyncSemaphores;
/**
* Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java
index c574aa61d9..7f61c476d5 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java
@@ -86,19 +86,17 @@ public class ModuleSyncService {
}
/**
- * Deletes the SchemaSet for provided cmHandle if the SchemaSet Exists.
+ * Deletes the SchemaSet for schema set id if the SchemaSet Exists.
*
- * @param yangModelCmHandle the yang model of cm handle.
+ * @param schemaSetId the schema set id to be deleted
*/
- public void deleteSchemaSetIfExists(final YangModelCmHandle yangModelCmHandle) {
- final String schemaSetAndAnchorName = yangModelCmHandle.getId();
+ public void deleteSchemaSetIfExists(final String schemaSetId) {
try {
- cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetAndAnchorName,
+ cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetId,
CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED);
- log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetAndAnchorName);
+ log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetId);
} catch (final SchemaSetNotFoundException e) {
- log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.",
- schemaSetAndAnchorName);
+ log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.", schemaSetId);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
new file mode 100644
index 0000000000..5e26650eda
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
@@ -0,0 +1,113 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler;
+import org.onap.cps.ncmp.api.impl.utils.YangDataConverter;
+import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
+import org.onap.cps.ncmp.api.inventory.CmHandleState;
+import org.onap.cps.ncmp.api.inventory.CompositeState;
+import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
+import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
+import org.onap.cps.spi.model.DataNode;
+import org.springframework.stereotype.Component;
+
+@RequiredArgsConstructor
+@Component
+@Slf4j
+public class ModuleSyncTasks {
+ private final InventoryPersistence inventoryPersistence;
+ private final SyncUtils syncUtils;
+ private final ModuleSyncService moduleSyncService;
+ private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler;
+
+ private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null);
+
+ /**
+ * Perform module sync on a batch of cm handles.
+ *
+ * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on
+ * @return completed future to handle post-processing
+ */
+ public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) {
+ final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
+ for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
+ final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
+ final YangModelCmHandle yangModelCmHandle =
+ YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
+ final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+ try {
+ moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
+ moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
+ cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
+ } catch (final Exception e) {
+ syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+ LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+ setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
+ cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+ }
+ log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+ }
+ updateCmHandlesStateBatch(cmHandelStatePerCmHandle);
+ return COMPLETED_FUTURE;
+ }
+
+ /**
+ * Reset state to "ADVISED" for any previously failed cm handles.
+ *
+ * @param failedCmHandles previously failed (locked) cm handles
+ * @return completed future to handle post-processing
+ */
+ public CompletableFuture<Void> resetFailedCmHandles(final List<YangModelCmHandle> failedCmHandles) {
+ final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size());
+ for (final YangModelCmHandle failedCmHandle : failedCmHandles) {
+ final CompositeState compositeState = failedCmHandle.getCompositeState();
+ final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
+ if (isReadyForRetry) {
+ log.debug("Reset cm handle {} state to ADVISED to be re-attempted by module-sync watchdog",
+ failedCmHandle.getId());
+ cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED);
+ }
+ }
+ updateCmHandlesStateBatch(cmHandleStatePerCmHandle);
+ return COMPLETED_FUTURE;
+ }
+
+ private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle,
+ final CompositeState.LockReason lockReason) {
+ advisedCmHandle.getCompositeState().setLockReason(lockReason);
+ }
+
+ private void updateCmHandlesStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) {
+ // To be refactored as part of CPS-1231; Use state-save-batch capability (depends sub-task12, 13)
+ for (final Map.Entry<YangModelCmHandle, CmHandleState> entry : cmHandleStatePerCmHandle.entrySet()) {
+ lcmEventsCmHandleStateHandler.updateCmHandleState(entry.getKey(), entry.getValue());
+ }
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
index be811a1147..8074fe6fe1 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
@@ -21,17 +21,16 @@
package org.onap.cps.ncmp.api.inventory.sync;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
-import org.onap.cps.ncmp.api.inventory.CmHandleState;
-import org.onap.cps.ncmp.api.inventory.CompositeState;
-import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
-import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
-import org.springframework.beans.factory.annotation.Qualifier;
+import org.onap.cps.spi.model.DataNode;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -40,75 +39,75 @@ import org.springframework.stereotype.Component;
@Component
public class ModuleSyncWatchdog {
- private static final boolean MODEL_SYNC_IN_PROGRESS = false;
- private static final boolean MODEL_SYNC_DONE = true;
-
- private final InventoryPersistence inventoryPersistence;
-
private final SyncUtils syncUtils;
+ private final BlockingQueue<DataNode> moduleSyncWorkQueue;
+ private final Map<String, Object> moduleSyncStartedOnCmHandles;
+ private final ModuleSyncTasks moduleSyncTasks;
- private final ModuleSyncService moduleSyncService;
-
- @Qualifier("moduleSyncSemaphores")
- private final ConcurrentMap<String, Boolean> moduleSyncSemaphores;
-
- private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler;
+ private static final int MODULE_SYNC_BATCH_SIZE = 100;
+ private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
+ private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
/**
* Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
*/
- @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
- public void executeAdvisedCmHandlePoll() {
- syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> {
- final String cmHandleId = advisedCmHandle.getId();
- if (hasPushedIntoSemaphoreMap(cmHandleId)) {
- log.debug("executing module sync on {}", cmHandleId);
- final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
- try {
- moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
- moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
- lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.READY);
- updateModuleSyncSemaphoreMap(cmHandleId);
- } catch (final Exception e) {
- syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
- LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
- setCmHandleStateLocked(advisedCmHandle, compositeState.getLockReason());
- }
- log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
- } else {
- log.debug("{} already processed by another instance", cmHandleId);
- }
- });
- log.debug("No Cm-Handles currently found in an ADVISED state");
+ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}")
+ public void moduleSyncAdvisedCmHandles() {
+ populateWorkQueueIfNeeded();
+ while (!moduleSyncWorkQueue.isEmpty()) {
+ final Collection<DataNode> nextBatch = prepareNextBatch();
+ moduleSyncTasks.performModuleSync(nextBatch);
+ preventBusyWait();
+ }
}
/**
- * Execute Cm Handle poll which changes the cm handle state from 'LOCKED' to 'ADVISED'.
+ * Find any failed (locked) cm handles and change state back to 'ADVISED'.
*/
@Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}")
- public void executeLockedCmHandlePoll() {
- final List<YangModelCmHandle> lockedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
- for (final YangModelCmHandle lockedCmHandle : lockedCmHandles) {
- final CompositeState compositeState = lockedCmHandle.getCompositeState();
- final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
- if (isReadyForRetry) {
- log.debug("Reset cm handle {} state to ADVISED to re-attempt module-sync", lockedCmHandle.getId());
- lcmEventsCmHandleStateHandler.updateCmHandleState(lockedCmHandle, CmHandleState.ADVISED);
- }
- }
+ public void resetPreviouslyFailedCmHandles() {
+ final List<YangModelCmHandle> failedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
+ moduleSyncTasks.resetFailedCmHandles(failedCmHandles);
}
- private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle,
- final CompositeState.LockReason lockReason) {
- advisedCmHandle.getCompositeState().setLockReason(lockReason);
- lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.LOCKED);
+ private void preventBusyWait() {
+ // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution
+ // but leaving here to minimize impacts on this class for that Jira
+ try {
+ TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
- private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
- moduleSyncSemaphores.replace(cmHandleId, MODEL_SYNC_DONE);
+ private void populateWorkQueueIfNeeded() {
+ if (moduleSyncWorkQueue.isEmpty()) {
+ final List<DataNode> advisedCmHandles = syncUtils.getAdvisedCmHandles();
+ for (final DataNode advisedCmHandle : advisedCmHandles) {
+ if (!moduleSyncWorkQueue.offer(advisedCmHandle)) {
+ log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id"));
+ }
+ }
+ }
}
- private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
- return moduleSyncSemaphores.putIfAbsent(cmHandleId, MODEL_SYNC_IN_PROGRESS) == null;
+ private Collection<DataNode> prepareNextBatch() {
+ final Collection<DataNode> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
+ final Collection<DataNode> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
+ moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE);
+ log.debug("nextBatchCandidates size : {}", nextBatchCandidates.size());
+ for (final DataNode batchCandidate : nextBatchCandidates) {
+ final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id"));
+ final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP
+ .equals(moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP));
+ if (alreadyAddedToInProgressMap) {
+ log.debug("module sync for {} already in progress by other instance", cmHandleId);
+ } else {
+ nextBatch.add(batchCandidate);
+ }
+ }
+ log.debug("nextBatch size : {}", nextBatch.size());
+ return nextBatch;
}
+
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
index 16fb8f44d8..537f50122c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
@@ -64,19 +64,14 @@ public class SyncUtils {
private static final Pattern retryAttemptPattern = Pattern.compile("^Attempt #(\\d+) failed:");
/**
- * Query data nodes for cm handles with an "ADVISED" cm handle state, and select a random entry for processing.
+ * Query data nodes for cm handles with an "ADVISED" cm handle state.
*
- * @return a randomized yang model cm handle list with ADVISED state, return empty list if not found
+ * @return cm handles (data nodes) in ADVISED state (empty list if none found)
*/
- public List<YangModelCmHandle> getAdvisedCmHandles() {
- final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>(
- cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED));
- log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size());
- if (advisedCmHandlesAsDataNodeList.isEmpty()) {
- return Collections.emptyList();
- }
- Collections.shuffle(advisedCmHandlesAsDataNodeList);
- return convertCmHandlesDataNodesToYangModelCmHandles(advisedCmHandlesAsDataNodeList);
+ public List<DataNode> getAdvisedCmHandles() {
+ final List<DataNode> advisedCmHandlesAsDataNodes = cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED);
+ log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodes.size());
+ return advisedCmHandlesAsDataNodes;
}
/**