diff options
13 files changed, 330 insertions, 240 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java index 70d08dccdc..e13d3c2328 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminer.java @@ -32,6 +32,7 @@ import org.onap.cps.ncmp.api.datajobs.models.DmiWriteOperation; import org.onap.cps.ncmp.api.datajobs.models.ProducerKey; import org.onap.cps.ncmp.api.datajobs.models.WriteOperation; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; +import org.onap.cps.ncmp.impl.models.RequiredDmiService; import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher; import org.onap.cps.ncmp.impl.utils.YangDataConverter; import org.onap.cps.spi.model.DataNode; @@ -69,9 +70,11 @@ public class WriteRequestExaminer { final DataNode dataNode = alternateIdMatcher .getCmHandleDataNodeByLongestMatchingAlternateId(writeOperation.path(), PATH_SEPARATOR); - final DmiWriteOperation dmiWriteOperation = createDmiWriteOperation(writeOperation, dataNode); + final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(dataNode); + + final DmiWriteOperation dmiWriteOperation = createDmiWriteOperation(writeOperation, yangModelCmHandle); - final ProducerKey producerKey = createProducerKey(dataNode); + final ProducerKey producerKey = createProducerKey(yangModelCmHandle); final List<DmiWriteOperation> dmiWriteOperations; if (dmiWriteOperationsPerProducerKey.containsKey(producerKey)) { dmiWriteOperations = dmiWriteOperationsPerProducerKey.get(producerKey); @@ -82,24 +85,23 @@ public class WriteRequestExaminer { dmiWriteOperations.add(dmiWriteOperation); } - private ProducerKey createProducerKey(final DataNode dataNode) { - return new ProducerKey((String) dataNode.getLeaves().get("dmi-service-name"), - (String) dataNode.getLeaves().get("data-producer-identifier")); + private ProducerKey createProducerKey(final YangModelCmHandle yangModelCmHandle) { + return new ProducerKey(yangModelCmHandle.resolveDmiServiceName(RequiredDmiService.DATA), + yangModelCmHandle.getDataProducerIdentifier()); } private DmiWriteOperation createDmiWriteOperation(final WriteOperation writeOperation, - final DataNode dataNode) { + final YangModelCmHandle yangModelCmHandle) { return new DmiWriteOperation( writeOperation.path(), writeOperation.op(), - (String) dataNode.getLeaves().get("module-set-tag"), + yangModelCmHandle.getModuleSetTag(), writeOperation.value(), writeOperation.operationId(), - getPrivatePropertiesFromDataNode(dataNode)); + getPrivatePropertiesFromDataNode(yangModelCmHandle)); } - private Map<String, String> getPrivatePropertiesFromDataNode(final DataNode dataNode) { - final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(dataNode); + private Map<String, String> getPrivatePropertiesFromDataNode(final YangModelCmHandle yangModelCmHandle) { final Map<String, String> cmHandleDmiProperties = new LinkedHashMap<>(); yangModelCmHandle.getDmiProperties() .forEach(dmiProperty -> cmHandleDmiProperties.put(dmiProperty.getName(), dmiProperty.getValue())); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java index e8ee600ea9..80bc4ab69f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/AsyncTaskExecutor.java @@ -68,7 +68,7 @@ public class AsyncTaskExecutor { private void handleTaskCompletion(final Object response, final Throwable throwable) { if (throwable != null) { if (throwable instanceof TimeoutException) { - log.error("Async task didn't completed within the required time.", throwable); + log.error("Async task didn't complete within the required time.", throwable); } else { log.error("Watchdog async batch failed.", throwable); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java index e627f8f894..31fcbad08b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasks.java @@ -58,37 +58,16 @@ public class ModuleSyncTasks { */ public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes, final AtomicInteger batchCounter) { + final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = + new HashMap<>(cmHandlesAsDataNodes.size()); try { - final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle - = new HashMap<>(cmHandlesAsDataNodes.size()); - for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) { - final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id")); + cmHandlesAsDataNodes.forEach(cmHandleAsDataNode -> { final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(cmHandleAsDataNode); - final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId); - final boolean inUpgrade = ModuleOperationsUtils.inUpgradeOrUpgradeFailed(compositeState); - try { - if (inUpgrade) { - moduleSyncService.syncAndUpgradeSchemaSet(yangModelCmHandle); - } else { - moduleSyncService.deleteSchemaSetIfExists(cmHandleId); - moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); - } - yangModelCmHandle.getCompositeState().setLockReason(null); - cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY); - } catch (final Exception e) { - log.warn("Processing of {} module failed due to reason {}.", cmHandleId, e.getMessage()); - final LockReasonCategory lockReasonCategory = inUpgrade ? LockReasonCategory.MODULE_UPGRADE_FAILED - : LockReasonCategory.MODULE_SYNC_FAILED; - moduleOperationsUtils.updateLockReasonWithAttempts(compositeState, - lockReasonCategory, e.getMessage()); - setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); - cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED); - } - log.info("{} is now in {} state", cmHandleId, cmHandelStatePerCmHandle.get(yangModelCmHandle).name()); - } - lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandelStatePerCmHandle); + cmHandleStatePerCmHandle.put(yangModelCmHandle, processCmHandle(yangModelCmHandle)); + }); } finally { batchCounter.getAndDecrement(); + lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle); log.info("Processing module sync batch finished. {} batch(es) active.", batchCounter.get()); } return CompletableFuture.completedFuture(null); @@ -108,13 +87,36 @@ public class ModuleSyncTasks { final CompositeState compositeState = yangModelCmHandle.getCompositeState(); final String resetCmHandleId = yangModelCmHandle.getId(); log.debug("Resetting CM handle {} state to ADVISED for retry by the module-sync watchdog. Lock reason: {}", - yangModelCmHandle.getId(), compositeState.getLockReason().getLockReasonCategory().name()); + yangModelCmHandle.getId(), compositeState.getLockReason().getLockReasonCategory().name()); cmHandleStatePerCmHandle.put(yangModelCmHandle, CmHandleState.ADVISED); removeResetCmHandleFromModuleSyncMap(resetCmHandleId); } lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle); } + private CmHandleState processCmHandle(final YangModelCmHandle yangModelCmHandle) { + final CompositeState compositeState = inventoryPersistence.getCmHandleState(yangModelCmHandle.getId()); + final boolean inUpgrade = ModuleOperationsUtils.inUpgradeOrUpgradeFailed(compositeState); + try { + if (inUpgrade) { + moduleSyncService.syncAndUpgradeSchemaSet(yangModelCmHandle); + } else { + moduleSyncService.deleteSchemaSetIfExists(yangModelCmHandle.getId()); + moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle); + } + yangModelCmHandle.getCompositeState().setLockReason(null); + return CmHandleState.READY; + } catch (final Exception e) { + log.warn("Processing of {} module failed due to reason {}.", yangModelCmHandle.getId(), e.getMessage()); + final LockReasonCategory lockReasonCategory = inUpgrade ? LockReasonCategory.MODULE_UPGRADE_FAILED + : LockReasonCategory.MODULE_SYNC_FAILED; + moduleOperationsUtils.updateLockReasonWithAttempts(compositeState, + lockReasonCategory, e.getMessage()); + setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason()); + return CmHandleState.LOCKED; + } + } + private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle, final CompositeState.LockReason lockReason) { advisedCmHandle.getCompositeState().setLockReason(lockReason); @@ -125,4 +127,4 @@ public class ModuleSyncTasks { log.info("{} removed from in progress map", resetCmHandleId); } } -} +}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandler.java index 6cce153269..de3df6b9da 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandler.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2023 Nordix Foundation + * Copyright (C) 2022-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,14 +32,6 @@ import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; public interface LcmEventsCmHandleStateHandler { /** - * Updates the composite state of cmHandle based on cmHandleState. - * - * @param yangModelCmHandle cm handle represented as yang model - * @param targetCmHandleState target cm handle state - */ - void updateCmHandleState(final YangModelCmHandle yangModelCmHandle, final CmHandleState targetCmHandleState); - - /** * Updates the composite state of cmHandle based on cmHandleState in batch. * * @param cmHandleStatePerCmHandle Map of Yang Model Cm Handle and corresponding cm handle state. diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerAsyncHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerAsyncHelper.java index cf7921c350..a53c902683 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerAsyncHelper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerAsyncHelper.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation + * Copyright (C) 2023-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; +import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandlerImpl.CmHandleTransitionPair; import org.onap.cps.ncmp.impl.utils.YangDataConverter; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -38,25 +39,12 @@ public class LcmEventsCmHandleStateHandlerAsyncHelper { private final LcmEventsService lcmEventsService; /** - * Publish LCM Event in asynchronous manner. - * - * @param targetNcmpServiceCmHandle target NcmpServiceCmHandle - * @param currentNcmpServiceCmHandle current NcmpServiceCmHandle - */ - @Async("notificationExecutor") - public void publishLcmEventAsynchronously(final NcmpServiceCmHandle targetNcmpServiceCmHandle, - final NcmpServiceCmHandle currentNcmpServiceCmHandle) { - publishLcmEvent(targetNcmpServiceCmHandle, currentNcmpServiceCmHandle); - } - - /** * Publish LcmEvent in batches and in asynchronous manner. * * @param cmHandleTransitionPairs Pair of existing and modified cm handle represented as YangModelCmHandle */ @Async("notificationExecutor") - public void publishLcmEventBatchAsynchronously( - final Collection<LcmEventsCmHandleStateHandlerImpl.CmHandleTransitionPair> cmHandleTransitionPairs) { + public void publishLcmEventBatchAsynchronously(final Collection<CmHandleTransitionPair> cmHandleTransitionPairs) { cmHandleTransitionPairs.forEach(cmHandleTransitionPair -> publishLcmEvent( toNcmpServiceCmHandle(cmHandleTransitionPair.getTargetYangModelCmHandle()), toNcmpServiceCmHandle(cmHandleTransitionPair.getCurrentYangModelCmHandle()))); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImpl.java index b1b7b955f7..e9bd37219a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImpl.java @@ -38,12 +38,10 @@ import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.api.inventory.models.CompositeState; -import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; import org.onap.cps.ncmp.impl.inventory.CompositeStateUtils; import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; import org.onap.cps.ncmp.impl.inventory.models.CmHandleState; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; -import org.onap.cps.ncmp.impl.utils.YangDataConverter; import org.springframework.stereotype.Service; @Slf4j @@ -55,25 +53,6 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState private final LcmEventsCmHandleStateHandlerAsyncHelper lcmEventsCmHandleStateHandlerAsyncHelper; @Override - public void updateCmHandleState(final YangModelCmHandle updatedYangModelCmHandle, - final CmHandleState targetCmHandleState) { - - final CompositeState compositeState = updatedYangModelCmHandle.getCompositeState(); - - if (isCompositeStateSame(compositeState, targetCmHandleState)) { - log.debug("CmHandle with id : {} already in state : {}", updatedYangModelCmHandle.getId(), - targetCmHandleState); - } else { - final YangModelCmHandle currentYangModelCmHandle = YangModelCmHandle.deepCopyOf(updatedYangModelCmHandle); - updateToSpecifiedCmHandleState(updatedYangModelCmHandle, targetCmHandleState); - persistCmHandle(updatedYangModelCmHandle, currentYangModelCmHandle); - lcmEventsCmHandleStateHandlerAsyncHelper.publishLcmEventAsynchronously( - toNcmpServiceCmHandle(updatedYangModelCmHandle), - toNcmpServiceCmHandle(currentYangModelCmHandle)); - } - } - - @Override @Timed(value = "cps.ncmp.cmhandle.state.update.batch", description = "Time taken to update a batch of cm handle states") public void updateCmHandleStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) { @@ -113,28 +92,13 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState return cmHandleTransitionPairs; } - - private void persistCmHandle(final YangModelCmHandle targetYangModelCmHandle, - final YangModelCmHandle currentYangModelCmHandle) { - if (isNew(currentYangModelCmHandle.getCompositeState())) { - log.debug("Registering a new cm handle {}", targetYangModelCmHandle.getId()); - inventoryPersistence.saveCmHandle(targetYangModelCmHandle); - } else if (isDeleted(targetYangModelCmHandle.getCompositeState())) { - log.info("CmHandle with Id : {} is DELETED", targetYangModelCmHandle.getId()); - } else { - inventoryPersistence.saveCmHandleState(targetYangModelCmHandle.getId(), - targetYangModelCmHandle.getCompositeState()); - } - } - private void persistCmHandleBatch(final Collection<CmHandleTransitionPair> cmHandleTransitionPairs) { final List<YangModelCmHandle> newCmHandles = new ArrayList<>(); final Map<String, CompositeState> compositeStatePerCmHandleId = new LinkedHashMap<>(); cmHandleTransitionPairs.forEach(cmHandleTransitionPair -> { - if (isNew(cmHandleTransitionPair.getCurrentYangModelCmHandle().getCompositeState() - )) { + if (isNew(cmHandleTransitionPair.getCurrentYangModelCmHandle().getCompositeState())) { newCmHandles.add(cmHandleTransitionPair.getTargetYangModelCmHandle()); } else if (!isDeleted(cmHandleTransitionPair.getTargetYangModelCmHandle().getCompositeState())) { compositeStatePerCmHandleId.put(cmHandleTransitionPair.getTargetYangModelCmHandle().getId(), @@ -145,10 +109,11 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState inventoryPersistence.saveCmHandleBatch(newCmHandles); inventoryPersistence.saveCmHandleStateBatch(compositeStatePerCmHandleId); + logCmHandleStateChanges(cmHandleTransitionPairs); } private void updateToSpecifiedCmHandleState(final YangModelCmHandle yangModelCmHandle, - final CmHandleState targetCmHandleState) { + final CmHandleState targetCmHandleState) { if (READY == targetCmHandleState) { setInitialStates(yangModelCmHandle); @@ -193,8 +158,11 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState return (compositeState != null && compositeState.getCmHandleState() == targetCmHandleState); } - private NcmpServiceCmHandle toNcmpServiceCmHandle(final YangModelCmHandle yangModelCmHandle) { - return YangDataConverter.toNcmpServiceCmHandle(yangModelCmHandle); + private static void logCmHandleStateChanges(final Collection<CmHandleTransitionPair> cmHandleTransitionPairs) { + cmHandleTransitionPairs.stream() + .map(CmHandleTransitionPair::getTargetYangModelCmHandle) + .forEach(yangModelCmHandle -> log.info("{} is now in {} state", yangModelCmHandle.getId(), + yangModelCmHandle.getCompositeState().getCmHandleState().name())); } @Getter diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy index 84eb78b751..47b57669ca 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/WriteRequestExaminerSpec.groovy @@ -3,6 +3,8 @@ package org.onap.cps.ncmp.impl.datajobs import org.onap.cps.ncmp.api.datajobs.models.DataJobWriteRequest import org.onap.cps.ncmp.api.datajobs.models.WriteOperation +import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle +import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher import org.onap.cps.spi.model.DataNode import spock.lang.Specification @@ -60,4 +62,18 @@ class WriteRequestExaminerSpec extends Specification { then: 'we get the operation ids in the expected order.' assert dmiWriteOperations.operationId == ['1', '2', '3'] } + + def 'Validate the creation of a ProducerKey with correct dmiservicename.'() { + given: 'yangModelCmHandles with service name: "#dmiServiceName" and data service name: "#dataServiceName"' + def yangModelCmHandle = YangModelCmHandle.toYangModelCmHandle(dmiServiceName, dataServiceName, '', new NcmpServiceCmHandle(cmHandleId: 'cm-handle-id-1'), '', '', 'dpi1') + when: 'the ProducerKey is created' + def result = objectUnderTest.createProducerKey(yangModelCmHandle).toString() + then: 'we get the ProducerKey with the correct service name' + assert result == expectedProducerKey + where: 'the following services are registered' + dmiServiceName | dataServiceName || expectedProducerKey + 'dmi-service-name' | '' || 'dmi-service-name#dpi1' + '' | 'dmi-data-service-name' || 'dmi-data-service-name#dpi1' + 'dmi-service-name' | 'dmi-data-service-name' || 'dmi-service-name#dpi1' + } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy index 794bbc99d3..8ce1e934f2 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncTasksSpec.groovy @@ -37,6 +37,7 @@ import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandler import org.onap.cps.spi.exceptions.DataNodeNotFoundException import org.onap.cps.spi.model.DataNode import org.slf4j.LoggerFactory +import spock.lang.Ignore import spock.lang.Specification import java.util.concurrent.atomic.AtomicInteger @@ -122,25 +123,25 @@ class ModuleSyncTasksSpec extends Specification { 'module upgrade' | MODULE_UPGRADE | 'Upgrade in progress' || MODULE_UPGRADE_FAILED } - // TODO Update this test once the bug CPS-2474 is fixed - def 'Module sync fails if a handle gets deleted during module sync.'() { + @Ignore // TODO Enable this test once the bug CPS-2474 is fixed + def 'Module sync succeeds even if a handle gets deleted during module sync.'() { given: 'cm handles in an ADVISED state' def cmHandle1 = cmHandleAsDataNodeByIdAndState('cm-handle-1', CmHandleState.ADVISED) def cmHandle2 = cmHandleAsDataNodeByIdAndState('cm-handle-2', CmHandleState.ADVISED) - and: 'inventory persistence returns the first handle with ADVISED state' - mockInventoryPersistence.getCmHandleState('cm-handle-1') >> new CompositeState(cmHandleState: CmHandleState.ADVISED) - and: 'inventory persistence cannot find the second handle' - mockInventoryPersistence.getCmHandleState('cm-handle-2') >> { throw new DataNodeNotFoundException('dataspace', 'anchor', 'xpath') } + and: 'inventory persistence cannot find the first handle' + mockInventoryPersistence.getCmHandleState('cm-handle-1') >> { throw new DataNodeNotFoundException('dataspace', 'anchor', 'xpath') } + and: 'inventory persistence returns the second handle with ADVISED state' + mockInventoryPersistence.getCmHandleState('cm-handle-2') >> new CompositeState(cmHandleState: CmHandleState.ADVISED) when: 'module sync poll is executed' objectUnderTest.performModuleSync([cmHandle1, cmHandle2], batchCount) - then: 'an exception is thrown' - thrown(DataNodeNotFoundException) - and: 'even though the existing cm-handle did sync' - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' } - and: 'logs report the cm-handle is in READY state' - assert getLoggingEvent().formattedMessage == 'cm-handle-1 is now in READY state' - and: 'this is impossible as the state handler was not called at all' - 0 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) + then: 'no exception is thrown' + noExceptionThrown() + and: 'the deleted cm-handle did not sync' + 0 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-1' } + and: 'the existing cm-handle synced' + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assert args[0].id == 'cm-handle-2' } + and: 'the state handler called' + 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) } def 'Reset failed CM Handles #scenario.'() { @@ -174,7 +175,7 @@ class ModuleSyncTasksSpec extends Specification { when: 'module sync poll is executed' objectUnderTest.performModuleSync([cmHandle1], batchCount) then: 'module sync service is invoked for cm handle' - 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-1') } + 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) and: 'the entry for other cm handle is still in the progress map' assert moduleSyncStartedOnCmHandles.get('other-cm-handle') != null } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy index bd7c321bc7..4b676e1b4c 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2023 Nordix Foundation + * Copyright (C) 2022-2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +20,19 @@ package org.onap.cps.ncmp.impl.inventory.sync.lcm +import ch.qos.logback.classic.Level +import ch.qos.logback.classic.Logger +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.read.ListAppender import org.onap.cps.ncmp.api.inventory.models.CompositeState import org.onap.cps.ncmp.impl.inventory.DataStoreSyncState import org.onap.cps.ncmp.impl.inventory.InventoryPersistence import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle +import org.slf4j.LoggerFactory import spock.lang.Specification +import static java.util.Collections.EMPTY_LIST +import static java.util.Collections.EMPTY_MAP import static org.onap.cps.ncmp.impl.inventory.models.CmHandleState.ADVISED import static org.onap.cps.ncmp.impl.inventory.models.CmHandleState.DELETED import static org.onap.cps.ncmp.impl.inventory.models.CmHandleState.DELETING @@ -35,6 +42,17 @@ import static org.onap.cps.ncmp.impl.inventory.models.LockReasonCategory.MODULE_ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { + def logger = Spy(ListAppender<ILoggingEvent>) + + void setup() { + ((Logger) LoggerFactory.getLogger(LcmEventsCmHandleStateHandlerImpl.class)).addAppender(logger) + logger.start() + } + + void cleanup() { + ((Logger) LoggerFactory.getLogger(LcmEventsCmHandleStateHandlerImpl.class)).detachAndStopAllAppenders() + } + def mockInventoryPersistence = Mock(InventoryPersistence) def mockLcmEventsCreator = Mock(LcmEventsCreator) def mockLcmEventsService = Mock(LcmEventsService) @@ -51,30 +69,39 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { compositeState = new CompositeState(cmHandleState: fromCmHandleState) yangModelCmHandle = new YangModelCmHandle(id: cmHandleId, dmiProperties: [], publicProperties: [], compositeState: compositeState) when: 'update state is invoked' - objectUnderTest.updateCmHandleState(yangModelCmHandle, toCmHandleState) + objectUnderTest.updateCmHandleStateBatch(Map.of(yangModelCmHandle, toCmHandleState)) then: 'state is saved using inventory persistence' - expectedCallsToInventoryPersistence * mockInventoryPersistence.saveCmHandleState(cmHandleId, _) + 1 * mockInventoryPersistence.saveCmHandleStateBatch(_) >> { + args -> { + def cmHandleStatePerCmHandleId = args[0] as Map<String, CompositeState> + assert cmHandleStatePerCmHandleId.get(cmHandleId).cmHandleState == toCmHandleState + } + } + and: 'log message shows state change at INFO level' + def loggingEvent = (ILoggingEvent) logger.list[0] + assert loggingEvent.level == Level.INFO + assert loggingEvent.formattedMessage == "${cmHandleId} is now in ${toCmHandleState} state" and: 'event service is called to publish event' - expectedCallsToEventService * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) + 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) where: 'state change parameters are provided' - stateChange | fromCmHandleState | toCmHandleState || expectedCallsToInventoryPersistence | expectedCallsToEventService - 'ADVISED to READY' | ADVISED | READY || 1 | 1 - 'READY to LOCKED' | READY | LOCKED || 1 | 1 - 'ADVISED to ADVISED' | ADVISED | ADVISED || 0 | 0 - 'READY to READY' | READY | READY || 0 | 0 - 'LOCKED to LOCKED' | LOCKED | LOCKED || 0 | 0 - 'DELETED to ADVISED' | DELETED | ADVISED || 0 | 1 + stateChange | fromCmHandleState | toCmHandleState + 'ADVISED to READY' | ADVISED | READY + 'READY to LOCKED' | READY | LOCKED + 'ADVISED to LOCKED' | ADVISED | LOCKED + 'ADVISED to DELETING' | ADVISED | DELETING } - def 'Update and Publish Events on State Change from NO_EXISTING state to ADVISED'() { + def 'Update and Publish Events on State Change from non-existing to ADVISED'() { given: 'Cm Handle represented as YangModelCmHandle' yangModelCmHandle = new YangModelCmHandle(id: cmHandleId, dmiProperties: [], publicProperties: []) when: 'update state is invoked' - objectUnderTest.updateCmHandleState(yangModelCmHandle, ADVISED) - then: 'state is saved using inventory persistence' - 1 * mockInventoryPersistence.saveCmHandle(yangModelCmHandle) + objectUnderTest.updateCmHandleStateBatch(Map.of(yangModelCmHandle, ADVISED)) + then: 'CM-handle is saved using inventory persistence' + 1 * mockInventoryPersistence.saveCmHandleBatch(List.of(yangModelCmHandle)) and: 'event service is called to publish event' 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) + and: 'a log entry is written' + assert getLogMessage(0) == "${cmHandleId} is now in ADVISED state" } def 'Update and Publish Events on State Change from LOCKED to ADVISED'() { @@ -83,69 +110,62 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { lockReason: CompositeState.LockReason.builder().lockReasonCategory(MODULE_SYNC_FAILED).details('some lock details').build()) yangModelCmHandle = new YangModelCmHandle(id: cmHandleId, dmiProperties: [], publicProperties: [], compositeState: compositeState) when: 'update state is invoked' - objectUnderTest.updateCmHandleState(yangModelCmHandle, ADVISED) + objectUnderTest.updateCmHandleStateBatch(Map.of(yangModelCmHandle, ADVISED)) then: 'state is saved using inventory persistence and old lock reason details are retained' - 1 * mockInventoryPersistence.saveCmHandleState(cmHandleId, _) >> { + 1 * mockInventoryPersistence.saveCmHandleStateBatch(_) >> { args -> { - assert (args[1] as CompositeState).lockReason.details == 'some lock details' + def cmHandleStatePerCmHandleId = args[0] as Map<String, CompositeState> + assert cmHandleStatePerCmHandleId.get(cmHandleId).lockReason.details == 'some lock details' } } and: 'event service is called to publish event' 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) + and: 'a log entry is written' + assert getLogMessage(0) == "${cmHandleId} is now in ADVISED state" } - def 'Update and Publish Events on State Change from DELETING to ADVISED'() { - given: 'Cm Handle represented as YangModelCmHandle in DELETING state' - yangModelCmHandle = new YangModelCmHandle(id: cmHandleId, dmiProperties: [], publicProperties: [], compositeState: compositeState) - when: 'update state is invoked' - objectUnderTest.updateCmHandleState(yangModelCmHandle, ADVISED) - then: 'the cm handle is saved using inventory persistence' - 1 * mockInventoryPersistence.saveCmHandle(yangModelCmHandle) - and: 'event service is called to publish event' - 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) - } - - def 'Update and Publish Events on State Change to READY'() { + def 'Update and Publish Events on State Change to from ADVISED to READY'() { given: 'Cm Handle represented as YangModelCmHandle' compositeState = new CompositeState(cmHandleState: ADVISED) yangModelCmHandle = new YangModelCmHandle(id: cmHandleId, dmiProperties: [], publicProperties: [], compositeState: compositeState) and: 'global sync flag is set' compositeState.setDataSyncEnabled(false) when: 'update cmhandle state is invoked' - objectUnderTest.updateCmHandleState(yangModelCmHandle, READY) + objectUnderTest.updateCmHandleStateBatch(Map.of(yangModelCmHandle, READY)) then: 'state is saved using inventory persistence with expected dataSyncState' - 1 * mockInventoryPersistence.saveCmHandleState(cmHandleId, _) >> { + 1 * mockInventoryPersistence.saveCmHandleStateBatch(_) >> { args-> { - def result = (args[1] as CompositeState) - assert result.dataSyncEnabled == false - assert result.dataStores.operationalDataStore.dataStoreSyncState == DataStoreSyncState.NONE_REQUESTED - + def cmHandleStatePerCmHandleId = args[0] as Map<String, CompositeState> + assert cmHandleStatePerCmHandleId.get(cmHandleId).dataSyncEnabled == false + assert cmHandleStatePerCmHandleId.get(cmHandleId).dataStores.operationalDataStore.dataStoreSyncState == DataStoreSyncState.NONE_REQUESTED } } and: 'event service is called to publish event' 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) + and: 'a log entry is written' + assert getLogMessage(0) == "${cmHandleId} is now in READY state" } - def 'Update cmHandle state to "DELETING"' (){ + def 'Update cmHandle state from READY to DELETING' (){ given: 'cm Handle as Yang model' compositeState = new CompositeState(cmHandleState: READY) yangModelCmHandle = new YangModelCmHandle(id: cmHandleId, dmiProperties: [], publicProperties: [], compositeState: compositeState) when: 'updating cm handle state to "DELETING"' - objectUnderTest.updateCmHandleState(yangModelCmHandle, DELETING) + objectUnderTest.updateCmHandleStateBatch(Map.of(yangModelCmHandle, DELETING)) then: 'the cm handle state is as expected' yangModelCmHandle.getCompositeState().getCmHandleState() == DELETING and: 'method to persist cm handle state is called once' - 1 * mockInventoryPersistence.saveCmHandleState(yangModelCmHandle.getId(), yangModelCmHandle.getCompositeState()) + 1 * mockInventoryPersistence.saveCmHandleStateBatch(Map.of(yangModelCmHandle.getId(), yangModelCmHandle.getCompositeState())) and: 'the method to publish Lcm event is called once' 1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _) } - def 'Update cmHandle state to "DELETED"' (){ + def 'Update cmHandle state to DELETING to DELETED' (){ given: 'cm Handle with state "DELETING" as Yang model ' compositeState = new CompositeState(cmHandleState: DELETING) yangModelCmHandle = new YangModelCmHandle(id: cmHandleId, dmiProperties: [], publicProperties: [], compositeState: compositeState) when: 'updating cm handle state to "DELETED"' - objectUnderTest.updateCmHandleState(yangModelCmHandle, DELETED) + objectUnderTest.updateCmHandleStateBatch(Map.of(yangModelCmHandle, DELETED)) then: 'the cm handle state is as expected' yangModelCmHandle.getCompositeState().getCmHandleState() == DELETED and: 'the method to publish Lcm event is called once' @@ -157,14 +177,13 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { def cmHandleStateMap = setupBatch('NO_CHANGE') when: 'updating a batch of changes' objectUnderTest.updateCmHandleStateBatch(cmHandleStateMap) - then: 'batch is empty and nothing to update' - 1 * mockInventoryPersistence.saveCmHandleBatch(_) >> { - args -> { - assert (args[0] as Collection<YangModelCmHandle>).size() == 0 - } - } + then: 'no changes are persisted' + 1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST) + 1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP) and: 'no event will be published' 0 * mockLcmEventsService.publishLcmEvent(*_) + and: 'no log entries are written' + assert logger.list.empty } def 'Batch of new cm handles provided'() { @@ -178,8 +197,13 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { assert (args[0] as Collection<YangModelCmHandle>).id.containsAll('cmhandle1', 'cmhandle2') } } + and: 'no state updates are persisted' + 1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP) and: 'event service is called to publish events' 2 * mockLcmEventsService.publishLcmEvent(_, _, _) + and: 'two log entries are written' + assert getLogMessage(0) == 'cmhandle1 is now in ADVISED state' + assert getLogMessage(1) == 'cmhandle2 is now in ADVISED state' } def 'Batch of existing cm handles is updated'() { @@ -187,14 +211,19 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { def cmHandleStateMap = setupBatch('UPDATE') when: 'updating a batch of changes' objectUnderTest.updateCmHandleStateBatch(cmHandleStateMap) - then : 'existing cm handles composite state is persisted' + then: 'existing cm handles composite states are persisted' 1 * mockInventoryPersistence.saveCmHandleStateBatch(_) >> { args -> { - assert (args[0] as Map<String, CompositeState>).keySet().containsAll(['cmhandle1','cmhandle2']) + assert (args[0] as Map<String, CompositeState>).keySet().containsAll(['cmhandle1', 'cmhandle2']) } } + and: 'no new handles are persisted' + 1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST) and: 'event service is called to publish events' 2 * mockLcmEventsService.publishLcmEvent(_, _, _) + and: 'two log entries are written' + assert getLogMessage(0) == 'cmhandle1 is now in READY state' + assert getLogMessage(1) == 'cmhandle2 is now in DELETING state' } def 'Batch of existing cm handles is deleted'() { @@ -202,14 +231,30 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { def cmHandleStateMap = setupBatch('DELETED') when: 'updating a batch of changes' objectUnderTest.updateCmHandleStateBatch(cmHandleStateMap) - then : 'existing cm handles composite state is persisted' - 1 * mockInventoryPersistence.saveCmHandleStateBatch(_) >> { - args -> { - assert (args[0] as Map<String, CompositeState>).isEmpty() - } - } + then: 'state of deleted handles is not persisted' + 1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP) + and: 'no new handles are persisted' + 1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST) and: 'event service is called to publish events' 2 * mockLcmEventsService.publishLcmEvent(_, _, _) + and: 'two log entries are written' + assert getLogMessage(0) == 'cmhandle1 is now in DELETED state' + assert getLogMessage(1) == 'cmhandle2 is now in DELETED state' + } + + def 'Log entries and events are not sent when an error occurs during persistence'() { + given: 'A batch of updated cm handles' + def cmHandleStateMap = setupBatch('UPDATE') + and: 'an error will be thrown when trying to persist' + mockInventoryPersistence.saveCmHandleStateBatch(_) >> { throw new RuntimeException() } + when: 'updating a batch of changes' + objectUnderTest.updateCmHandleStateBatch(cmHandleStateMap) + then: 'the exception is not handled' + thrown(RuntimeException) + and: 'no events are published' + 0 * mockLcmEventsService.publishLcmEvent(_, _, _) + and: 'no log entries are written' + assert logger.list.empty } def setupBatch(type) { @@ -217,26 +262,31 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { def yangModelCmHandle1 = new YangModelCmHandle(id: 'cmhandle1', dmiProperties: [], publicProperties: []) def yangModelCmHandle2 = new YangModelCmHandle(id: 'cmhandle2', dmiProperties: [], publicProperties: []) - if ('NEW' == type) { - return [yangModelCmHandle1, yangModelCmHandle2] - } + switch (type) { + case 'NEW': + return [yangModelCmHandle1, yangModelCmHandle2] - if ('DELETED' == type) { - yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: READY) - yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY) - return [(yangModelCmHandle1): DELETED, (yangModelCmHandle2): DELETED] - } + case 'DELETED': + yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: READY) + yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY) + return [(yangModelCmHandle1): DELETED, (yangModelCmHandle2): DELETED] - if ('UPDATE' == type) { - yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: ADVISED) - yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY) - return [(yangModelCmHandle1): READY, (yangModelCmHandle2): DELETING] - } + case 'UPDATE': + yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: ADVISED) + yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY) + return [(yangModelCmHandle1): READY, (yangModelCmHandle2): DELETING] + + case 'NO_CHANGE': + yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: ADVISED) + yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY) + return [(yangModelCmHandle1): ADVISED, (yangModelCmHandle2): READY] - if ('NO_CHANGE' == type) { - yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: ADVISED) - yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY) - return [(yangModelCmHandle1): ADVISED, (yangModelCmHandle2): READY] + default: + throw new IllegalArgumentException("batch type '${type}' not recognized") } } + + def getLogMessage(index) { + return logger.list[index].formattedMessage + } } diff --git a/docker-compose/config/endurance.env b/docker-compose/config/endurance.env new file mode 100644 index 0000000000..0ca1a1149a --- /dev/null +++ b/docker-compose/config/endurance.env @@ -0,0 +1,35 @@ +DB_CONTAINER_NAME=endurance-dbpostgresql +DB_PORT=5433 + +NGINX_CONTAINER_NAME=endurance-nginx-loadbalancer +CPS_CORE_PORT=8884 + +ZOOKEEPER_CONTAINER_NAME=endurance-zookeeper +ZOOKEEPER_PORT=2182 + +KAFKA_CONTAINER_NAME=endurance-kafka +KAFKA_PORT=9093 + +NCMP_DMI_PLUGIN_CONTAINER_NAME=endurance-ncmp-dmi-plugin +DMI_PORT=8786 + +NCMP_DMI_PLUGIN_DEMO_AND_CSIT_STUB_CONTAINER_NAME=endurance-ncmp-dmi-plugin-demo-and-csit-stub +DMI_DEMO_STUB_PORT=8787 + +POLICY_EXECUTOR_STUB_CONTAINER_NAME=endurance-policy-executor-stub +POLICY_EXECUTOR_STUB_PORT=8788 + +PROMETHEUS_CONTAINER_NAME=endurance-prometheus +PROMETHEUS_PORT=9091 + +GRAFANA_CONTAINER_NAME=endurance-grafana +GRAFANA_PORT=3001 + +KAFKA_UI_CONTAINER_NAME=endurance-kafka-ui +KAFKA_UI_PORT=8090 + +JAEGER_SERVICE_CONTAINER_NAME=endurance-jaeger-service +JAEGER_SERVICE_PORT=16687 + +CPS_NCMP_CACHES_CLUSTER_NAME=endurance-cps-and-ncmp-common-cache-cluster +CPS_NCMP_INSTANCE_CONFIG_NAME=endurance-cps-and-ncmp-hazelcast-instance-config
\ No newline at end of file diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index b854064ca5..feb58d849d 100644 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -24,9 +24,11 @@ services: ### docker-compose --profile dmi-stub --profile policy-executor-stub up -d -> run CPS with stubbed dmi-plugin and policy executor stub (for policy executor service testing make POLICY_SERVICE_ENABLED "true") ### to disable notifications make notification.enabled to false & comment out kafka/zookeeper services ### ### DEBUG: Look for '### DEBUG' comments to enable CPS-NCMP debugging + ### docker-compose --profile dmi-stub --project-name endurance --env-file config/endurance.env up -d -> run CPS with stubbed dmi-plugin for endurance testing + ### docker-compose --profile dmi-stub --project-name endurance down --volumes dbpostgresql: - container_name: dbpostgresql + container_name: ${DB_CONTAINER_NAME:-dbpostgresql} image: postgres:14.1-alpine ports: - ${DB_PORT:-5432}:5432 @@ -80,7 +82,7 @@ services: ### DEBUG - ${CPS_CORE_DEBUG_PORT:-5005}:5005 nginx: - container_name: nginx-loadbalancer + container_name: ${NGINX_CONTAINER_NAME:-nginx-loadbalancer} image: nginx:latest ports: - ${CPS_CORE_PORT:-8883}:80 @@ -93,17 +95,17 @@ services: ### if kafka is not required comment out zookeeper and kafka ### zookeeper: image: confluentinc/cp-zookeeper:6.2.1 - container_name: zookeeper + container_name: ${ZOOKEEPER_CONTAINER_NAME:-zookeeper} ports: - - '2181:2181' + - ${ZOOKEEPER_PORT:-2181}:2181 environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka:6.2.1 - container_name: kafka + container_name: ${KAFKA_CONTAINER_NAME:-kafka} ports: - - '9092:9092' + - ${KAFKA_PORT:-9092}:9092 depends_on: - zookeeper environment: @@ -114,7 +116,7 @@ services: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 ncmp-dmi-plugin: - container_name: ncmp-dmi-plugin + container_name: ${NCMP_DMI_PLUGIN_CONTAINER_NAME:-ncmp-dmi-plugin} image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/ncmp-dmi-plugin:${DMI_VERSION:-1.6.0-SNAPSHOT-latest} ports: - ${DMI_PORT:-8783}:8080 @@ -139,7 +141,7 @@ services: - dmi-service ncmp-dmi-plugin-demo-and-csit-stub: - container_name: ncmp-dmi-plugin-demo-and-csit-stub + container_name: ${NCMP_DMI_PLUGIN_DEMO_AND_CSIT_STUB_CONTAINER_NAME:-ncmp-dmi-plugin-demo-and-csit-stub} image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/dmi-plugin-demo-and-csit-stub:${DMI_DEMO_STUB_VERSION:-latest} ports: - ${DMI_DEMO_STUB_PORT:-8784}:8092 @@ -158,19 +160,19 @@ services: - dmi-service policy-executor-stub: - container_name: policy-executor-stub + container_name: ${POLICY_EXECUTOR_STUB_CONTAINER_NAME:-policy-executor-stub} image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/policy-executor-stub:latest ports: - - 8785:8093 + - ${POLICY_EXECUTOR_STUB_PORT:-8785}:8093 restart: unless-stopped profiles: - policy-executor-stub prometheus: - container_name: prometheus + container_name: ${PROMETHEUS_CONTAINER_NAME:-prometheus} image: prom/prometheus:latest ports: - - 9090:9090 + - ${PROMETHEUS_PORT:-9090}:9090 restart: always volumes: - ./config/prometheus.yml:/etc/prometheus/prometheus.yml @@ -180,12 +182,12 @@ services: grafana: image: grafana/grafana-oss:latest user: "" - container_name: grafana + container_name: ${GRAFANA_CONTAINER_NAME:-grafana} depends_on: prometheus: condition: service_started ports: - - 3000:3000 + - ${GRAFANA_PORT:-3000}:3000 volumes: - ./config/grafana/provisioning/:/etc/grafana/provisioning/ - ./config/grafana/jvm-micrometer-dashboard.json:/var/lib/grafana/dashboards/jvm-micrometer-dashboard.json @@ -197,10 +199,10 @@ services: - monitoring kafka-ui: - container_name: kafka-ui + container_name: ${KAFKA_UI_CONTAINER_NAME:-kafka-ui} image: provectuslabs/kafka-ui:latest ports: - - 8089:8080 + - ${KAFKA_UI_PORT:-8089}:8080 environment: DYNAMIC_CONFIG_ENABLED: 'true' KAFKA_CLUSTERS_0_NAME: 'cps-kafka-local' @@ -209,10 +211,10 @@ services: - monitoring jaeger-service: - container_name: jaeger-service + container_name: ${JAEGER_SERVICE_CONTAINER_NAME:-jaeger-service} image: jaegertracing/all-in-one:latest ports: - - 16686:16686 + - ${JAEGER_SERVICE_PORT:-16686}:16686 restart: unless-stopped profiles: - tracing diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy index 19b10a3c79..00ce38fa2d 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy @@ -20,7 +20,7 @@ package org.onap.cps.integration.functional.ncmp -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.integration.KafkaTestContainer import org.onap.cps.integration.base.CpsIntegrationSpecBase @@ -32,7 +32,6 @@ import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle import org.onap.cps.ncmp.events.lcm.v1.LcmEvent import org.onap.cps.ncmp.impl.inventory.models.CmHandleState import org.onap.cps.ncmp.impl.inventory.models.LockReasonCategory -import spock.lang.Ignore import spock.util.concurrent.PollingConditions import java.time.Duration @@ -42,21 +41,23 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { NetworkCmProxyInventoryFacade objectUnderTest def uniqueId = 'ch-unique-id-for-create-test' - def kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class) + static KafkaConsumer kafkaConsumer def setup() { objectUnderTest = networkCmProxyInventoryFacade + subscribeAndClearPreviousMessages() } - @Ignore - def 'CM Handle registration is successful.'() { + def cleanupSpec() { + kafkaConsumer.unsubscribe() + kafkaConsumer.close() + } + + def 'CM Handle registration.'() { given: 'DMI will return modules when requested' dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2'] dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2'] - and: 'consumer subscribed to topic' - kafkaConsumer.subscribe(['ncmp-events']) - when: 'a CM-handle is registered for creation' def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId) def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]) @@ -68,32 +69,33 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { and: 'CM-handle is initially in ADVISED state' assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState - and: 'the module sync watchdog is triggered' + then: 'the module sync watchdog is triggered' moduleSyncWatchdog.moduleSyncAdvisedCmHandles() - and: 'CM-handle goes to READY state after module sync' + then: 'CM-handle goes to READY state after module sync' new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> { assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState }) - and: 'the messages is polled' - def message = kafkaConsumer.poll(Duration.ofMillis(10000)) - def records = message.records(new TopicPartition('ncmp-events', 0)) - - and: 'the newest lcm event notification is received with READY state' - def notificationMessage = jsonObjectMapper.convertJsonString(records.last().value().toString(), LcmEvent) - /*TODO (Toine) This test was failing intermittently (when running as part of suite). - I suspect that it often gave false positives as the message being assert here was any random message created by previous tests - By checking the cm-handle and using an unique cm-handle in this test this flaw became obvious. - I have now ignored this test as it is out of scope of this commit to fix it. - Created: https://lf-onap.atlassian.net/browse/CPS-2468 to fix this instead - */ - assert notificationMessage.event.cmHandleId == uniqueId - assert notificationMessage.event.newValues.cmHandleState.value() == 'READY' - and: 'the CM-handle has expected modules' assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort() + then: 'get the latest messages' + def consumerRecords = getLatestConsumerRecords() + + and: 'both converted messages are for the correct cm handle' + def notificationMessages = [] + for (def consumerRecord : consumerRecords) { + notificationMessages.add(jsonObjectMapper.convertJsonString(consumerRecord.value().toString(), LcmEvent)) + } + assert notificationMessages.event.cmHandleId == [ uniqueId, uniqueId ] + + and: 'the oldest event is about the update to ADVISED state' + notificationMessages[0].event.newValues.cmHandleState.value() == 'ADVISED' + + and: 'the next event is about update to READY state' + notificationMessages[1].event.newValues.cmHandleState.value() == 'READY' + cleanup: 'deregister CM handle' deregisterCmHandle(DMI1_URL, uniqueId) } @@ -224,4 +226,23 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase { cleanup: 'deregister CM handles' deregisterCmHandles(DMI1_URL, ['ch-1', 'ch-2']) } + + def subscribeAndClearPreviousMessages() { + kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class) + kafkaConsumer.subscribe(['ncmp-events']) + kafkaConsumer.poll(Duration.ofMillis(500)) + } + + def getLatestConsumerRecords() { + def consumerRecords = [] + def retryAttempts = 10 + while (consumerRecords.size() < 2) { + retryAttempts-- + consumerRecords.addAll(kafkaConsumer.poll(Duration.ofMillis(100))) + if (retryAttempts == 0) + break + } + consumerRecords + } + } diff --git a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java index d41f752912..ff4aec4175 100644 --- a/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java +++ b/integration-test/src/test/java/org/onap/cps/integration/KafkaTestContainer.java @@ -21,6 +21,7 @@ package org.onap.cps.integration; import java.util.HashMap; import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -33,11 +34,12 @@ import org.testcontainers.utility.DockerImageName; * This ensures only one instance of Kafka container across the integration tests. * Avoid unnecessary resource and time consumption. */ +@Slf4j public class KafkaTestContainer extends KafkaContainer { private static final String IMAGE_NAME_AND_VERSION = "registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1"; - private static KafkaTestContainer kafkaTestContainer; + private static volatile KafkaTestContainer kafkaTestContainer; private KafkaTestContainer() { super(DockerImageName.parse(IMAGE_NAME_AND_VERSION).asCompatibleSubstituteFor("confluentinc/cp-kafka")); @@ -51,8 +53,15 @@ public class KafkaTestContainer extends KafkaContainer { */ public static KafkaTestContainer getInstance() { if (kafkaTestContainer == null) { - kafkaTestContainer = new KafkaTestContainer(); - Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::close)); + synchronized (KafkaTestContainer.class) { + if (kafkaTestContainer == null) { + kafkaTestContainer = new KafkaTestContainer(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Shutting down KafkaTestContainer..."); + kafkaTestContainer.stop(); + })); + } + } } return kafkaTestContainer; } @@ -63,8 +72,11 @@ public class KafkaTestContainer extends KafkaContainer { @Override public void start() { - super.start(); - System.setProperty("spring.kafka.properties.bootstrap.servers", kafkaTestContainer.getBootstrapServers()); + if (!isRunning()) { + super.start(); + System.setProperty("spring.kafka.properties.bootstrap.servers", getBootstrapServers()); + log.info("KafkaTestContainer started at {}", getBootstrapServers()); + } } @Override @@ -78,8 +90,9 @@ public class KafkaTestContainer extends KafkaContainer { configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers()); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); - configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); + configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.MAX_VALUE); return configProps; } |