diff options
author | kissand <andras.zoltan.kiss@est.tech> | 2022-07-21 14:53:37 +0200 |
---|---|---|
committer | kissand <andras.zoltan.kiss@est.tech> | 2022-08-22 14:36:00 +0200 |
commit | 41de6d64d89b8caeddb88389cf49fb2fb9e368d4 (patch) | |
tree | 7ed9f301f582f7f9252517d1e3b4dd41775d4b01 /cps-ncmp-service | |
parent | a1a33160054bb9e7ffa57e18270dfa0f9a2ad77e (diff) |
Distributed datastore solution for Data Sync Watchdog
- update lombok config to handle Qualifier annotation
- update Semaphore config to use ConcurrentMap
- update SyncUtils to return a list of cm handles
- update DataSyncWatchdog and ModuleSyncWatchdog with Qualifier
- update DataSyncWatchdog to handle a list of cm handles
- Use get with xpath to check cm handle state
Issue-ID: CPS-1015
Change-Id: Icb39bd29f89e0020d49a1f8960476ffe81b12362
Signed-off-by: kissand <andras.zoltan.kiss@est.tech>
Diffstat (limited to 'cps-ncmp-service')
12 files changed, 158 insertions, 134 deletions
diff --git a/cps-ncmp-service/lombok.config b/cps-ncmp-service/lombok.config index 0736fc58c6..b60a192069 100644 --- a/cps-ncmp-service/lombok.config +++ b/cps-ncmp-service/lombok.config @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (C) 2021 Nordix Foundation +# Copyright (C) 2021-2022 Nordix Foundation # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,3 +18,4 @@ config.stopBubbling = true lombok.addLombokGeneratedAnnotation = true +lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceImpl.java index 6696f8e5ad..f8836e6bf8 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceImpl.java @@ -148,7 +148,7 @@ public class NetworkCmProxyCmHandlerQueryServiceImpl implements NetworkCmProxyCm cpsPathQueryResult = NO_QUERY_TO_EXECUTE; } else { try { - cpsPathQueryResult = cmHandleQueries.getCmHandleDataNodesByCpsPath( + cpsPathQueryResult = cmHandleQueries.queryCmHandleDataNodesByCpsPath( cpsPath.get("cpsPath"), INCLUDE_ALL_DESCENDANTS) .stream().map(this::createNcmpServiceCmHandle) .collect(Collectors.toMap(NcmpServiceCmHandle::getCmHandleId, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java index 1efe17695e..571558ac0c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java @@ -1,5 +1,5 @@ /* - * ============LICENSE_START======================================================== + * ===========LICENSE_START======================================================== * Copyright (C) 2022 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -24,7 +24,6 @@ import com.hazelcast.config.Config; import com.hazelcast.config.MapConfig; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; -import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Bean; @@ -41,23 +40,23 @@ public class SynchronizationSemaphoresConfig { /** * Module Sync Distributed Map Instance. * - * @return configured map of module sync semaphore + * @return configured map of module sync semaphores */ @Bean - public ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap() { - return createHazelcastInstance("moduleSyncSemaphore", "moduleSyncSemaphoreConfig") - .getMap("moduleSyncSemaphore"); + public ConcurrentMap<String, Boolean> moduleSyncSemaphores() { + return createHazelcastInstance("moduleSyncSemaphores", "moduleSyncSemaphoresConfig") + .getMap("moduleSyncSemaphores"); } /** * Data Sync Distributed Map Instance. * - * @return configured map of data sync semaphore + * @return configured map of data sync semaphores */ @Bean - public Map<String, String> dataSyncSemaphoreMap() { - return createHazelcastInstance("dataSyncSemaphore", "dataSyncSemaphoreConfig") - .getMap("dataSyncSemaphore"); + public ConcurrentMap<String, Boolean> dataSyncSemaphores() { + return createHazelcastInstance("dataSyncSemaphores", "dataSyncSemaphoresConfig") + .getMap("dataSyncSemaphores"); } private HazelcastInstance createHazelcastInstance( diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/CmHandleQueries.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/CmHandleQueries.java index 92387bab32..245161747d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/CmHandleQueries.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/CmHandleQueries.java @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.api.inventory; import static org.onap.cps.ncmp.api.impl.utils.YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle; import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS; +import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS; import java.util.Collection; import java.util.Collections; @@ -65,7 +66,7 @@ public class CmHandleQueries { final String cpsPath = "//public-properties[@name=\"" + publicPropertyQueryPair.getKey() + "\" and @value=\"" + publicPropertyQueryPair.getValue() + "\"]"; - final Collection<DataNode> dataNodes = getCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS); + final Collection<DataNode> dataNodes = queryCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS); if (cmHandleIdToNcmpServiceCmHandles == null) { cmHandleIdToNcmpServiceCmHandles = collectDataNodesToNcmpServiceCmHandles(dataNodes); } else { @@ -108,8 +109,8 @@ public class CmHandleQueries { * @param cmHandleState cm handle state * @return a list of cm handles */ - public List<DataNode> getCmHandlesByState(final CmHandleState cmHandleState) { - return getCmHandleDataNodesByCpsPath("//state[@cm-handle-state=\"" + cmHandleState + "\"]", + public List<DataNode> queryCmHandlesByState(final CmHandleState cmHandleState) { + return queryCmHandleDataNodesByCpsPath("//state[@cm-handle-state=\"" + cmHandleState + "\"]", INCLUDE_ALL_DESCENDANTS); } @@ -119,21 +120,23 @@ public class CmHandleQueries { * @param cpsPath cps path for which the cmHandle is requested * @return a list of data nodes representing the cm handles. */ - public List<DataNode> getCmHandleDataNodesByCpsPath(final String cpsPath, - final FetchDescendantsOption fetchDescendantsOption) { + public List<DataNode> queryCmHandleDataNodesByCpsPath(final String cpsPath, + final FetchDescendantsOption fetchDescendantsOption) { return cpsDataPersistenceService.queryDataNodes(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, cpsPath + ANCESTOR_CM_HANDLES, fetchDescendantsOption); } /** - * Method which returns cm handles by the cm handle id and state. + * Method to check the state of a cm handle with given id. + * * @param cmHandleId cm handle id - * @param cmHandleState cm handle state - * @return a list of cm handles + * @param requiredCmHandleState the required state of the cm handle + * @return a boolean, true if the state is equal to the required state */ - public List<DataNode> getCmHandlesByIdAndState(final String cmHandleId, final CmHandleState cmHandleState) { - return getCmHandleDataNodesByCpsPath("//cm-handles[@id='" + cmHandleId + "']/state[@cm-handle-state=\"" - + cmHandleState + "\"]", FetchDescendantsOption.OMIT_DESCENDANTS); + public boolean cmHandleHasState(final String cmHandleId, final CmHandleState requiredCmHandleState) { + final DataNode stateDataNode = getCmHandleState(cmHandleId); + final String cmHandleStateAsString = (String) stateDataNode.getLeaves().get("cm-handle-state"); + return CmHandleState.valueOf(cmHandleStateAsString).equals(requiredCmHandleState); } /** @@ -141,8 +144,8 @@ public class CmHandleQueries { * @param dataStoreSyncState sync state * @return a list of cm handles */ - public List<DataNode> getCmHandlesByOperationalSyncState(final DataStoreSyncState dataStoreSyncState) { - return getCmHandleDataNodesByCpsPath("//state/datastores" + "/operational[@sync-state=\"" + public List<DataNode> queryCmHandlesByOperationalSyncState(final DataStoreSyncState dataStoreSyncState) { + return queryCmHandleDataNodesByCpsPath("//state/datastores" + "/operational[@sync-state=\"" + dataStoreSyncState + "\"]", FetchDescendantsOption.OMIT_DESCENDANTS); } @@ -160,6 +163,12 @@ public class CmHandleQueries { return convertYangModelCmHandleToNcmpServiceCmHandle(YangDataConverter .convertCmHandleToYangModel(dataNode, dataNode.getLeaves().get("id").toString())); } + + private DataNode getCmHandleState(final String cmHandleId) { + final String xpath = "/dmi-registry/cm-handles[@id='" + cmHandleId + "']/state"; + return cpsDataPersistenceService.getDataNode(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, + xpath, OMIT_DESCENDANTS); + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java index 395fb01f45..45ba078044 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java @@ -1,5 +1,5 @@ /* - * ============LICENSE_START======================================================= + * ============LICENSE_START======================================================= * Copyright (C) 2022 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,14 +21,15 @@ package org.onap.cps.ncmp.api.inventory.sync; import java.time.OffsetDateTime; +import java.util.concurrent.ConcurrentMap; import java.util.function.Consumer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.api.CpsDataService; -import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; import org.onap.cps.ncmp.api.inventory.CompositeState; import org.onap.cps.ncmp.api.inventory.DataStoreSyncState; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -37,39 +38,45 @@ import org.springframework.stereotype.Service; @Service public class DataSyncWatchdog { + private static final boolean DATA_SYNC_IN_PROGRESS = false; + private static final boolean DATA_SYNC_DONE = true; + private final InventoryPersistence inventoryPersistence; private final CpsDataService cpsDataService; private final SyncUtils syncUtils; + @Qualifier("dataSyncSemaphores") + private final ConcurrentMap<String, Boolean> dataSyncSemaphores; + /** * Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in * 'UNSYNCHRONIZED'. */ @Scheduled(fixedDelayString = "${timers.cm-handle-data-sync.sleep-time-ms:30000}") public void executeUnSynchronizedReadyCmHandlePoll() { - YangModelCmHandle unSynchronizedReadyCmHandle = syncUtils.getAnUnSynchronizedReadyCmHandle(); - while (unSynchronizedReadyCmHandle != null) { + syncUtils.getUnsynchronizedReadyCmHandles().forEach(unSynchronizedReadyCmHandle -> { final String cmHandleId = unSynchronizedReadyCmHandle.getId(); - log.debug("Cm-Handles found in READY and UNSYNCHRONIZED state: {}", cmHandleId); - final CompositeState compositeState = inventoryPersistence - .getCmHandleState(cmHandleId); - final String resourceData = syncUtils.getResourceData(cmHandleId); - if (resourceData == null) { - log.debug("Error accessing the node for Cm-Handle: {}", cmHandleId); - } else if (unSynchronizedReadyCmHandle.getCompositeState().getDataSyncEnabled().equals(false)) { - log.debug("Error: data sync enabled for {} must be true." - + "Data sync enabled is currently set to false", cmHandleId); + if (hasPushedIntoSemaphoreMap(cmHandleId)) { + log.debug("Executing data sync on {}", cmHandleId); + final CompositeState compositeState = inventoryPersistence + .getCmHandleState(cmHandleId); + final String resourceData = syncUtils.getResourceData(cmHandleId); + if (resourceData == null) { + log.debug("Error retrieving resource data for Cm-Handle: {}", cmHandleId); + } else { + cpsDataService.saveData("NFP-Operational", cmHandleId, + resourceData, OffsetDateTime.now()); + setSyncStateToSynchronized().accept(compositeState); + inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); + updateDataSyncSemaphoreMap(cmHandleId); + } } else { - cpsDataService.saveData("NFP-Operational", cmHandleId, - resourceData, OffsetDateTime.now()); - setSyncStateToSynchronized().accept(compositeState); - inventoryPersistence.saveCmHandleState(cmHandleId, compositeState); + log.debug("{} already processed by another instance", cmHandleId); } - unSynchronizedReadyCmHandle = syncUtils.getAnUnSynchronizedReadyCmHandle(); - } - log.debug("No Cm-Handles currently found in an READY State and Operational Sync State is UNSYNCHRONIZED"); + }); + log.debug("No Cm-Handles currently found in READY State and Operational Sync State is UNSYNCHRONIZED"); } private Consumer<CompositeState> setSyncStateToSynchronized() { @@ -81,4 +88,12 @@ public class DataSyncWatchdog { .lastSyncTime(CompositeState.nowInSyncTimeFormat()).build()); }; } + + private void updateDataSyncSemaphoreMap(final String cmHandleId) { + dataSyncSemaphores.replace(cmHandleId, DATA_SYNC_DONE); + } + + private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) { + return dataSyncSemaphores.putIfAbsent(cmHandleId, DATA_SYNC_IN_PROGRESS) == null; + } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index 7c2a4fc386..be811a1147 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -31,6 +31,7 @@ import org.onap.cps.ncmp.api.inventory.CmHandleState; import org.onap.cps.ncmp.api.inventory.CompositeState; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; import org.onap.cps.ncmp.api.inventory.LockReasonCategory; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -39,13 +40,17 @@ import org.springframework.stereotype.Component; @Component public class ModuleSyncWatchdog { + private static final boolean MODEL_SYNC_IN_PROGRESS = false; + private static final boolean MODEL_SYNC_DONE = true; + private final InventoryPersistence inventoryPersistence; private final SyncUtils syncUtils; private final ModuleSyncService moduleSyncService; - private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap; + @Qualifier("moduleSyncSemaphores") + private final ConcurrentMap<String, Boolean> moduleSyncSemaphores; private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler; @@ -100,10 +105,10 @@ public class ModuleSyncWatchdog { } private void updateModuleSyncSemaphoreMap(final String cmHandleId) { - moduleSyncSemaphoreMap.replace(cmHandleId, true); + moduleSyncSemaphores.replace(cmHandleId, MODEL_SYNC_DONE); } private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) { - return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null; + return moduleSyncSemaphores.putIfAbsent(cmHandleId, MODEL_SYNC_IN_PROGRESS) == null; } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java index 2b7d3c99cb..64ce2186b4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java @@ -44,7 +44,6 @@ import org.onap.cps.ncmp.api.inventory.CmHandleQueries; import org.onap.cps.ncmp.api.inventory.CmHandleState; import org.onap.cps.ncmp.api.inventory.CompositeState; import org.onap.cps.ncmp.api.inventory.DataStoreSyncState; -import org.onap.cps.ncmp.api.inventory.InventoryPersistence; import org.onap.cps.ncmp.api.inventory.LockReasonCategory; import org.onap.cps.spi.FetchDescendantsOption; import org.onap.cps.spi.model.DataNode; @@ -56,8 +55,6 @@ import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor public class SyncUtils { - private final InventoryPersistence inventoryPersistence; - private final CmHandleQueries cmHandleQueries; private final DmiDataOperations dmiDataOperations; @@ -73,7 +70,7 @@ public class SyncUtils { */ public List<YangModelCmHandle> getAdvisedCmHandles() { final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>( - cmHandleQueries.getCmHandlesByState(CmHandleState.ADVISED)); + cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED)); log.info("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size()); if (advisedCmHandlesAsDataNodeList.isEmpty()) { return Collections.emptyList(); @@ -86,25 +83,26 @@ public class SyncUtils { * First query data nodes for cm handles with CM Handle Operational Sync State in "UNSYNCHRONIZED" and * randomly select a CM Handle and query the data nodes for CM Handle State in "READY". * - * @return a random yang model cm handle with State in READY and Operation Sync State in "UNSYNCHRONIZED", - * return null if not found + * @return a randomized yang model cm handle list with State in READY and Operation Sync State in "UNSYNCHRONIZED", + * return empty list if not found */ - public YangModelCmHandle getAnUnSynchronizedReadyCmHandle() { - final List<DataNode> unSynchronizedCmHandles = cmHandleQueries - .getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED); - if (unSynchronizedCmHandles.isEmpty()) { - return null; - } - Collections.shuffle(unSynchronizedCmHandles); - for (final DataNode cmHandle : unSynchronizedCmHandles) { - final String cmHandleId = cmHandle.getLeaves().get("id").toString(); - final List<DataNode> readyCmHandles = cmHandleQueries - .getCmHandlesByIdAndState(cmHandleId, CmHandleState.READY); - if (!readyCmHandles.isEmpty()) { - return inventoryPersistence.getYangModelCmHandle(cmHandleId); + public List<YangModelCmHandle> getUnsynchronizedReadyCmHandles() { + final List<DataNode> unsynchronizedCmHandles = cmHandleQueries + .queryCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED); + + final List<YangModelCmHandle> yangModelCmHandles = new ArrayList<>(); + for (final DataNode unsynchronizedCmHandle : unsynchronizedCmHandles) { + final String cmHandleId = unsynchronizedCmHandle.getLeaves().get("id").toString(); + if (cmHandleQueries.cmHandleHasState(cmHandleId, CmHandleState.READY)) { + yangModelCmHandles.addAll( + convertCmHandlesDataNodesToYangModelCmHandles( + Collections.singletonList(unsynchronizedCmHandle))); } } - return null; + + Collections.shuffle(yangModelCmHandles); + + return yangModelCmHandles; } /** @@ -113,9 +111,9 @@ public class SyncUtils { * @return a random LOCKED yang model cm handle, return null if not found */ public List<YangModelCmHandle> getModuleSyncFailedCmHandles() { - final List<DataNode> lockedCmHandlesAsDataNodeList = cmHandleQueries.getCmHandleDataNodesByCpsPath( - "//lock-reason[@reason=\"LOCKED_MODULE_SYNC_FAILED\"]", - FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); + final List<DataNode> lockedCmHandlesAsDataNodeList = cmHandleQueries.queryCmHandleDataNodesByCpsPath( + "//lock-reason[@reason=\"LOCKED_MODULE_SYNC_FAILED\"]", + FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS); return convertCmHandlesDataNodesToYangModelCmHandles(lockedCmHandlesAsDataNodeList); } @@ -136,8 +134,8 @@ public class SyncUtils { } } compositeState.setLockReason(CompositeState.LockReason.builder() - .details(String.format("Attempt #%d failed: %s", attempt, errorMessage)) - .lockReasonCategory(lockReasonCategory).build()); + .details(String.format("Attempt #%d failed: %s", attempt, errorMessage)) + .lockReasonCategory(lockReasonCategory).build()); } @@ -150,8 +148,8 @@ public class SyncUtils { public boolean isReadyForRetry(final CompositeState compositeState) { int timeInMinutesUntilNextAttempt = 1; final OffsetDateTime time = - OffsetDateTime.parse(compositeState.getLastUpdateTime(), - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")); + OffsetDateTime.parse(compositeState.getLastUpdateTime(), + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")); final Matcher matcher = retryAttemptPattern.matcher(compositeState.getLockReason().getDetails()); if (matcher.find()) { timeInMinutesUntilNextAttempt = (int) Math.pow(2, Integer.parseInt(matcher.group(1))); @@ -161,7 +159,7 @@ public class SyncUtils { final int timeSinceLastAttempt = (int) Duration.between(time, OffsetDateTime.now()).toMinutes(); if (timeInMinutesUntilNextAttempt >= timeSinceLastAttempt) { log.info("Time until next attempt is {} minutes: ", - timeInMinutesUntilNextAttempt - timeSinceLastAttempt); + timeInMinutesUntilNextAttempt - timeSinceLastAttempt); } return timeSinceLastAttempt > timeInMinutesUntilNextAttempt; } @@ -174,8 +172,8 @@ public class SyncUtils { */ public String getResourceData(final String cmHandleId) { final ResponseEntity<Object> resourceDataResponseEntity = dmiDataOperations.getResourceDataFromDmi( - cmHandleId, DmiOperations.DataStoreEnum.PASSTHROUGH_OPERATIONAL, - UUID.randomUUID().toString()); + cmHandleId, DmiOperations.DataStoreEnum.PASSTHROUGH_OPERATIONAL, + UUID.randomUUID().toString()); if (resourceDataResponseEntity.getStatusCode().is2xxSuccessful()) { return getFirstResource(resourceDataResponseEntity.getBody()); } @@ -190,9 +188,10 @@ public class SyncUtils { return jsonObjectMapper.asJsonString(Map.of(firstElement.getKey(), firstElement.getValue())); } - private List<YangModelCmHandle> convertCmHandlesDataNodesToYangModelCmHandles( - final List<DataNode> cmHandlesAsDataNodeList) { - return cmHandlesAsDataNodeList.stream().map(dataNode -> YangDataConverter.convertCmHandleToYangModel(dataNode, - dataNode.getLeaves().get("id").toString())).collect(Collectors.toList()); + private static List<YangModelCmHandle> convertCmHandlesDataNodesToYangModelCmHandles( + final List<DataNode> cmHandlesAsDataNodeList) { + return cmHandlesAsDataNodeList.stream() + .map(cmHandle -> YangDataConverter.convertCmHandleToYangModel(cmHandle, + cmHandle.getLeaves().get("id").toString())).collect(Collectors.toList()); } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceSpec.groovy index 19c5049021..f1294ced7a 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceSpec.groovy @@ -21,7 +21,6 @@ package org.onap.cps.ncmp.api.impl import org.onap.cps.cpspath.parser.PathParsingException -import org.onap.cps.ncmp.api.NetworkCmProxyCmHandlerQueryService import org.onap.cps.ncmp.api.inventory.InventoryPersistence import org.onap.cps.ncmp.api.inventory.CmHandleQueries import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle @@ -52,7 +51,7 @@ class NetworkCmProxyCmHandlerQueryServiceSpec extends Specification { def conditionProperties = createConditionProperties('cmHandleWithCpsPath', [['cpsPath' : '/some/cps/path']]) cmHandleQueryParameters.setCmHandleQueryParameters([conditionProperties]) and: 'cmHandleQueries returns a non null query result' - cmHandleQueries.getCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [new DataNode(leaves: ['id':'some-cmhandle-id'])] + cmHandleQueries.queryCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [new DataNode(leaves: ['id':'some-cmhandle-id'])] and: 'CmHandleQueries returns cmHandles with the relevant query result' cmHandleQueries.combineCmHandleQueries(*_) >> ['PNFDemo1': new NcmpServiceCmHandle(cmHandleId: 'PNFDemo1'), 'PNFDemo3': new NcmpServiceCmHandle(cmHandleId: 'PNFDemo3')] when: 'the query is executed for both cm handle ids and details' @@ -70,7 +69,7 @@ class NetworkCmProxyCmHandlerQueryServiceSpec extends Specification { def conditionProperties = createConditionProperties('cmHandleWithCpsPath', [['cpsPath' : '/some/cps/path']]) cmHandleQueryParameters.setCmHandleQueryParameters([conditionProperties]) and: 'cmHandleQueries throws a path parsing exception' - cmHandleQueries.getCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> { throw thrownException } + cmHandleQueries.queryCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> { throw thrownException } when: 'the query is executed for both cm handle ids and details' objectUnderTest.queryCmHandleIds(cmHandleQueryParameters) objectUnderTest.queryCmHandles(cmHandleQueryParameters) @@ -134,7 +133,7 @@ class NetworkCmProxyCmHandlerQueryServiceSpec extends Specification { and: 'cmHandles are returned from the module names query' inventoryPersistence.queryAnchors(['some-module-name']) >> anchorsForModuleQuery and: 'cmHandleQueries returns a datanode result' - 2 * cmHandleQueries.getCmHandleDataNodesByCpsPath(*_) >> [someCmHandleDataNode] + 2 * cmHandleQueries.queryCmHandleDataNodesByCpsPath(*_) >> [someCmHandleDataNode] when: 'the query is executed for both cm handle ids and details' def returnedCmHandlesJustIds = objectUnderTest.queryCmHandleIds(cmHandleQueryParameters) def returnedCmHandlesWithData = objectUnderTest.queryCmHandles(cmHandleQueryParameters) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy index fe7ed9eeb3..ea84b440f3 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy @@ -31,19 +31,19 @@ import spock.lang.Specification class SynchronizationSemaphoresConfigSpec extends Specification { @Autowired - private Map<String, String> moduleSyncSemaphore; + private Map<String, Boolean> moduleSyncSemaphores; @Autowired - private Map<String, String> dataSyncSemaphore; + private Map<String, Boolean> dataSyncSemaphores; def 'Embedded Sync Semaphores'() { - expect: 'system is able to create an instance of ModuleSyncSemaphore' - assert null != moduleSyncSemaphore - and: 'system is able to create an instance of DataSyncSemaphore' - assert null != dataSyncSemaphore + expect: 'system is able to create an instance of ModuleSyncSemaphores' + assert null != moduleSyncSemaphores + and: 'system is able to create an instance of DataSyncSemaphores' + assert null != dataSyncSemaphores and: 'we have 2 instances' assert Hazelcast.allHazelcastInstances.size() == 2 and: 'the names match' - assert Hazelcast.allHazelcastInstances.name == ['moduleSyncSemaphore', 'dataSyncSemaphore'] + assert Hazelcast.allHazelcastInstances.name == ['moduleSyncSemaphores', 'dataSyncSemaphores'] } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CmHandleQueriesSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CmHandleQueriesSpec.groovy index 10a5d62461..ff173300a6 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CmHandleQueriesSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CmHandleQueriesSpec.groovy @@ -92,31 +92,31 @@ class CmHandleQueriesSpec extends Specification { cpsDataPersistenceService.queryDataNodes('NCMP-Admin', 'ncmp-dmi-registry', '//state[@cm-handle-state="ADVISED"]/ancestor::cm-handles', INCLUDE_ALL_DESCENDANTS) >> sampleDataNodes when: 'cm handles are fetched by state' - def result = objectUnderTest.getCmHandlesByState(cmHandleState) + def result = objectUnderTest.queryCmHandlesByState(cmHandleState) then: 'the returned result matches the result from the persistence service' assert result == sampleDataNodes } - def 'Get Cm Handles By State and Cm-Handle Id'() { + def 'Get Cm Handles state by Cm-Handle Id'() { given: 'a cm handle state to query' def cmHandleState = CmHandleState.READY and: 'cps data service returns a list of data nodes' - cpsDataPersistenceService.queryDataNodes('NCMP-Admin', 'ncmp-dmi-registry', - '//cm-handles[@id=\'some-cm-handle\']/state[@cm-handle-state="'+ 'READY'+'"]/ancestor::cm-handles', OMIT_DESCENDANTS) >> sampleDataNodes + cpsDataPersistenceService.getDataNode('NCMP-Admin', 'ncmp-dmi-registry', + '/dmi-registry/cm-handles[@id=\'some-cm-handle\']/state', OMIT_DESCENDANTS) >> new DataNode(leaves: ['cm-handle-state': 'READY']) when: 'cm handles are fetched by state and id' - def result = objectUnderTest.getCmHandlesByIdAndState('some-cm-handle', cmHandleState) + def result = objectUnderTest.getCmHandleState('some-cm-handle') then: 'the returned result is a list of data nodes returned by cps data service' - assert result == sampleDataNodes + assert result == new DataNode(leaves: ['cm-handle-state': 'READY']) } - def 'Get Cm Handles By Operational Sync State : UNSYNCHRONIZED'() { + def 'Retrieve Cm Handles By Operational Sync State : UNSYNCHRONIZED'() { given: 'a cm handle state to query' def cmHandleState = CmHandleState.READY and: 'cps data service returns a list of data nodes' cpsDataPersistenceService.queryDataNodes('NCMP-Admin', 'ncmp-dmi-registry', '//state/datastores/operational[@sync-state="'+'UNSYNCHRONIZED'+'"]/ancestor::cm-handles', OMIT_DESCENDANTS) >> sampleDataNodes when: 'cm handles are fetched by the UNSYNCHRONIZED operational sync state' - def result = objectUnderTest.getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) + def result = objectUnderTest.queryCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) then: 'the returned result is a list of data nodes returned by cps data service' assert result == sampleDataNodes } @@ -130,7 +130,7 @@ class CmHandleQueriesSpec extends Specification { cpsPath + '/ancestor::cm-handles', INCLUDE_ALL_DESCENDANTS) >> Arrays.asList(cmHandleDataNode) when: 'get cm handles by cps path is invoked' - def result = objectUnderTest.getCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS) + def result = objectUnderTest.queryCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS) then: 'the returned result is a list of data nodes returned by cps data service' assert result.contains(cmHandleDataNode) } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdogSpec.groovy index 650a779a42..605381970d 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdogSpec.groovy @@ -26,10 +26,11 @@ import org.onap.cps.ncmp.api.inventory.CmHandleState import org.onap.cps.ncmp.api.inventory.CompositeState import org.onap.cps.ncmp.api.inventory.InventoryPersistence import org.onap.cps.ncmp.api.inventory.DataStoreSyncState -import spock.lang.Shared import spock.lang.Specification +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap -class DataSyncSpec extends Specification { +class DataSyncWatchdogSpec extends Specification { def mockInventoryPersistence = Mock(InventoryPersistence) @@ -37,10 +38,11 @@ class DataSyncSpec extends Specification { def mockSyncUtils = Mock(SyncUtils) - @Shared + def stubbedMap = Stub(ConcurrentMap) + def jsonString = '{"stores:bookstore":{"categories":[{"code":"01"}]}}' - def objectUnderTest = new DataSyncWatchdog(mockInventoryPersistence, mockCpsDataService, mockSyncUtils) + def objectUnderTest = new DataSyncWatchdog(mockInventoryPersistence, mockCpsDataService, mockSyncUtils, stubbedMap as ConcurrentHashMap) def compositeState = getCompositeState() @@ -52,7 +54,7 @@ class DataSyncSpec extends Specification { given: 'sample resource data' def resourceData = jsonString and: 'sync utilities return a cm handle twice' - mockSyncUtils.getAnUnSynchronizedReadyCmHandle() >>> [yangModelCmHandle1, yangModelCmHandle2, null] + mockSyncUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2] when: 'data sync poll is executed' objectUnderTest.executeUnSynchronizedReadyCmHandlePoll() then: 'the inventory persistence cm handle returns a composite state for the first cm handle' @@ -73,24 +75,18 @@ class DataSyncSpec extends Specification { 1 * mockInventoryPersistence.saveCmHandleState('some-cm-handle-2', compositeState) } - def 'Schedule Data Sync for Cm Handle State in READY and Operational Sync State in UNSYNCHRONIZED which return empty data from Node because #scenario'() { - given: 'a yang model cm handle' - def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: new CompositeState(dataSyncEnabled: dataSyncEnabled)) - and: 'sync utilities returns a single cm handle' - mockSyncUtils.getAnUnSynchronizedReadyCmHandle() >>> [yangModelCmHandle, null] + def 'Schedule Data Sync for Cm Handle State in READY and Operational Sync State in UNSYNCHRONIZED which return empty data from Node'() { + given: 'cm handles in an ready state and operational sync state in unsynchronized' + and: 'sync utilities return a cm handle twice' + mockSyncUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1] when: 'data sync poll is executed' objectUnderTest.executeUnSynchronizedReadyCmHandlePoll() then: 'the inventory persistence cm handle returns a composite state for the first cm handle' - 1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState + 1 * mockInventoryPersistence.getCmHandleState('some-cm-handle-1') >> compositeState and: 'the sync util returns first resource data' - 1 * mockSyncUtils.getResourceData('some-cm-handle') >> resourceData + 1 * mockSyncUtils.getResourceData('some-cm-handle-1') >> null and: 'the cm-handle data is not saved' 0 * mockCpsDataService.saveData('NFP-Operational', 'some-cm-handle-1', jsonString, _) - where: - scenario | dataSyncEnabled | resourceData - 'data sync is not enabled' | false | jsonString - 'resource data is null' | true | null - 'data sync is not enabled and resource data is null' | false | null } def createSampleYangModelCmHandle(cmHandleId) { @@ -100,7 +96,7 @@ class DataSyncSpec extends Specification { def getCompositeState() { def cmHandleState = CmHandleState.READY - def compositeState = new CompositeState(cmHandleState: cmHandleState, dataSyncEnabled: true) + def compositeState = new CompositeState(cmHandleState: cmHandleState) compositeState.setDataStores(CompositeState.DataStores.builder() .operationalDataStore(CompositeState.Operational.builder().dataStoreSyncState(DataStoreSyncState.SYNCHRONIZED) .build()).build()) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy index fb4ca3933d..52fb110b33 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import org.onap.cps.ncmp.api.impl.operations.DmiDataOperations import org.onap.cps.ncmp.api.impl.operations.DmiOperations import org.onap.cps.ncmp.api.inventory.CmHandleQueries +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle import org.onap.cps.ncmp.api.inventory.CmHandleState import org.onap.cps.ncmp.api.inventory.CompositeState import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder @@ -54,7 +55,7 @@ class SyncUtilsSpec extends Specification{ def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) - def objectUnderTest = new SyncUtils(mockInventoryPersistence, mockCmHandleQueries, mockDmiDataOperations, jsonObjectMapper) + def objectUnderTest = new SyncUtils(mockCmHandleQueries, mockDmiDataOperations, jsonObjectMapper) @Shared def formattedDateAndTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(OffsetDateTime.now()) @@ -68,7 +69,7 @@ class SyncUtilsSpec extends Specification{ def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() { given: 'the inventory persistence service returns a collection of data nodes' - mockCmHandleQueries.getCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection + mockCmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection and: 'we have some additional (dmi, private) properties' dataNodeAdditionalProperties.xpath = dataNode.xpath + '/additional-properties[@name="dmiProp1"]' dataNode.childDataNodes = [dataNodeAdditionalProperties] @@ -106,7 +107,7 @@ class SyncUtilsSpec extends Specification{ def 'Get all locked Cm-Handle where Lock Reason is LOCKED_MODULE_SYNC_FAILED cm handle #scenario'() { given: 'the cps (persistence service) returns a collection of data nodes' - mockCmHandleQueries.getCmHandleDataNodesByCpsPath( + mockCmHandleQueries.queryCmHandleDataNodesByCpsPath( '//lock-reason[@reason="LOCKED_MODULE_SYNC_FAILED"]', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [dataNode] when: 'get locked Misbehaving cm handle is called' @@ -132,21 +133,21 @@ class SyncUtilsSpec extends Specification{ } - def 'Get a Cm-Handle where Operational Sync state is UnSynchronized and Cm-handle state is READY and #scenario'() { + def 'Get a Cm-Handle where #scenario'() { given: 'the inventory persistence service returns a collection of data nodes' - mockCmHandleQueries.getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) >> unSynchronizedDataNodes - mockCmHandleQueries.getCmHandlesByIdAndState("cm-handle-123", CmHandleState.READY) >> readyDataNodes + mockCmHandleQueries.queryCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) >> unSynchronizedDataNodes + mockCmHandleQueries.cmHandleHasState('cm-handle-123', CmHandleState.READY) >> cmHandleHasState when: 'get advised cm handles are fetched' - objectUnderTest.getAnUnSynchronizedReadyCmHandle() + def yangModelCollection = objectUnderTest.getUnsynchronizedReadyCmHandles() then: 'the returned data node collection is the correct size' - readyDataNodes.size() == expectedDataNodeSize - and: 'get yang model cm handles is invoked the correct number of times' - expectedCallsToGetYangModelCmHandle * mockInventoryPersistence.getYangModelCmHandle('cm-handle-123') + yangModelCollection.size() == expectedDataNodeSize + and: 'the result contains the correct data' + yangModelCollection.stream().map(yangModel -> yangModel.id).collect(Collectors.toSet()) == expectedYangModelCollectionIds where: 'the following scenarios are used' - scenario | unSynchronizedDataNodes | readyDataNodes || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize - 'exists' | [dataNode] | [dataNode] || 1 | 1 - 'unsynchronized exist but not ready' | [dataNode] | [] || 0 | 0 - 'does not exist' | [] | [] || 0 | 0 + scenario | unSynchronizedDataNodes | cmHandleHasState || expectedDataNodeSize | expectedYangModelCollectionIds + 'a Cm-Handle unsynchronized and ready' | [dataNode] | true || 1 | ['cm-handle-123'] as Set + 'a Cm-Handle unsynchronized but not ready' | [dataNode] | false || 0 | [] as Set + 'all Cm-Handle synchronized' | [] | false || 0 | [] as Set } def 'Get resource data through DMI Operations #scenario'() { |