diff options
author | 2025-03-27 08:36:32 +0000 | |
---|---|---|
committer | 2025-03-27 08:36:32 +0000 | |
commit | e23f16986d43d447d599e1c85922333af9df9a9d (patch) | |
tree | def61849809a07d085d112841a6b492e3290a2b3 | |
parent | c4fa925253a0745f06dee87edb3ba2c752dd6e12 (diff) | |
parent | 5cb2bcce3907c7a11d34a2c01ccab6e7e81736d3 (diff) |
Merge "Use Flux stream processing for CM-handle searches"
8 files changed, 53 insertions, 55 deletions
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java index d7b38d1a46..6215427dc1 100755 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java @@ -1,7 +1,7 @@ /* * ============LICENSE_START======================================================= * Copyright (C) 2021 Pantheon.tech - * Modifications Copyright (C) 2021-2025 Nordix Foundation + * Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe * Modifications Copyright (C) 2021 highstreet technologies GmbH * Modifications Copyright (C) 2021-2022 Bell Canada * ================================================================================ @@ -262,10 +262,9 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { final CmHandleQueryParameters cmHandleQueryParameters) { final CmHandleQueryApiParameters cmHandleQueryApiParameters = deprecationHelper.mapOldConditionProperties(cmHandleQueryParameters); - final Collection<NcmpServiceCmHandle> cmHandles = networkCmProxyInventoryFacade - .executeCmHandleSearch(cmHandleQueryApiParameters); final List<RestOutputCmHandle> restOutputCmHandles = - cmHandles.stream().map(this::toRestOutputCmHandle).collect(Collectors.toList()); + networkCmProxyInventoryFacade.executeCmHandleSearch(cmHandleQueryApiParameters) + .map(this::toRestOutputCmHandle).collectList().block(); return ResponseEntity.ok(restOutputCmHandles); } diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy index c3aca5a99b..94c113c053 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * Copyright (C) 2021 Pantheon.tech * Modifications Copyright (C) 2021 highstreet technologies GmbH - * Modifications Copyright (C) 2021-2024 Nordix Foundation + * Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe * Modifications Copyright (C) 2021-2022 Bell Canada. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -59,6 +59,7 @@ import org.springframework.http.HttpStatus import org.springframework.http.MediaType import org.springframework.http.ResponseEntity import org.springframework.test.web.servlet.MockMvc +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import spock.lang.Shared import spock.lang.Specification @@ -275,7 +276,7 @@ class NetworkCmProxyControllerSpec extends Specification { cmHandle2.alternateId = 'someAlternateId' cmHandle2.moduleSetTag = 'someModuleSetTag' cmHandle2.dataProducerIdentifier = 'someDataProducerIdentifier' - mockNetworkCmProxyInventoryFacade.executeCmHandleSearch(_) >> [cmHandle1, cmHandle2] + mockNetworkCmProxyInventoryFacade.executeCmHandleSearch(_) >> Flux.fromIterable([cmHandle1, cmHandle2]) when: 'the searches api is invoked' def response = mvc.perform(post(searchesEndpoint).contentType(MediaType.APPLICATION_JSON).content(jsonString)).andReturn().response then: 'response status returns OK' @@ -352,7 +353,7 @@ class NetworkCmProxyControllerSpec extends Specification { cmHandle2.cmHandleId = 'ch-2' cmHandle2.publicProperties = [color: 'green'] cmHandle2.currentTrustLevel = TrustLevel.NONE - mockNetworkCmProxyInventoryFacade.executeCmHandleSearch(_) >> [cmHandle1, cmHandle2] + mockNetworkCmProxyInventoryFacade.executeCmHandleSearch(_) >> Flux.fromIterable([cmHandle1, cmHandle2]) when: 'the searches api is invoked' def response = mvc.perform(post(searchesEndpoint).contentType(MediaType.APPLICATION_JSON).content(jsonString)).andReturn().response then: 'an empty cm handle identifier is returned' diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java index 9bfb775d55..876a5e7c40 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation + * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import org.onap.cps.ncmp.api.inventory.models.CompositeState; import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistration; import org.onap.cps.ncmp.api.inventory.models.DmiPluginRegistrationResponse; import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; +import reactor.core.publisher.Flux; public interface NetworkCmProxyInventoryFacade { @@ -96,9 +97,9 @@ public interface NetworkCmProxyInventoryFacade { * Retrieve cm handles with details for the given query parameters. * * @param cmHandleQueryApiParameters cm handle query parameters - * @return cm handles with details + * @return cm handle objects as a reactive stream (flux) */ - Collection<NcmpServiceCmHandle> executeCmHandleSearch(final CmHandleQueryApiParameters cmHandleQueryApiParameters); + Flux<NcmpServiceCmHandle> executeCmHandleSearch(final CmHandleQueryApiParameters cmHandleQueryApiParameters); /** * Retrieve cm handle ids for the given query parameters. diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/NetworkCmProxyInventoryFacadeImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/NetworkCmProxyInventoryFacadeImpl.java index 118c2bba70..7130afdcfd 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/NetworkCmProxyInventoryFacadeImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/NetworkCmProxyInventoryFacadeImpl.java @@ -1,7 +1,7 @@ /* * ============LICENSE_START======================================================= * Copyright (C) 2021 highstreet technologies GmbH - * Modifications Copyright (C) 2021-2024 Nordix Foundation + * Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe * Modifications Copyright (C) 2021 Pantheon.tech * Modifications Copyright (C) 2021-2022 Bell Canada * Modifications Copyright (C) 2023 TechMahindra Ltd. @@ -52,6 +52,7 @@ import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher; import org.onap.cps.ncmp.impl.utils.YangDataConverter; import org.onap.cps.utils.JsonObjectMapper; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; @Service @RequiredArgsConstructor @@ -118,15 +119,12 @@ public class NetworkCmProxyInventoryFacadeImpl implements NetworkCmProxyInventor } @Override - public Collection<NcmpServiceCmHandle> executeCmHandleSearch( + public Flux<NcmpServiceCmHandle> executeCmHandleSearch( final CmHandleQueryApiParameters cmHandleQueryApiParameters) { - final CmHandleQueryServiceParameters cmHandleQueryServiceParameters = jsonObjectMapper.convertToValueType( - cmHandleQueryApiParameters, CmHandleQueryServiceParameters.class); + final CmHandleQueryServiceParameters cmHandleQueryServiceParameters = + jsonObjectMapper.convertToValueType(cmHandleQueryApiParameters, CmHandleQueryServiceParameters.class); validateCmHandleQueryParameters(cmHandleQueryServiceParameters, CmHandleQueryConditions.ALL_CONDITION_NAMES); - final Collection<NcmpServiceCmHandle> ncmpServiceCmHandles = - parameterizedCmHandleQueryService.queryCmHandles(cmHandleQueryServiceParameters); - trustLevelManager.applyEffectiveTrustLevels(ncmpServiceCmHandles); - return ncmpServiceCmHandles; + return parameterizedCmHandleQueryService.queryCmHandles(cmHandleQueryServiceParameters); } @Override diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryService.java index fc8884433c..5a105b347a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryService.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022-2024 Nordix Foundation + * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ package org.onap.cps.ncmp.impl.inventory; import java.util.Collection; import org.onap.cps.ncmp.api.inventory.models.CmHandleQueryServiceParameters; import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; +import reactor.core.publisher.Flux; public interface ParameterizedCmHandleQueryService { /** @@ -31,6 +32,7 @@ public interface ParameterizedCmHandleQueryService { * public properties * modules * cps-path + * trust level * * @param cmHandleQueryServiceParameters the cm handle query parameters * @param outputAlternateId Boolean for cm handle reference type either @@ -62,19 +64,12 @@ public interface ParameterizedCmHandleQueryService { * public properties * modules * cps-path + * trust level * * @param cmHandleQueryServiceParameters the cm handle query parameters - * @return collection of cm handles + * @return cm handle objects as a reactive stream (flux) */ - Collection<NcmpServiceCmHandle> queryCmHandles(CmHandleQueryServiceParameters cmHandleQueryServiceParameters); - - /** - * Get all cm handle objects. - * Note: it is similar to all the queries above but simply no conditions and hence not 'parameterized' - * - * @return collection of cm handles - */ - Collection<NcmpServiceCmHandle> getAllCmHandles(); + Flux<NcmpServiceCmHandle> queryCmHandles(CmHandleQueryServiceParameters cmHandleQueryServiceParameters); /** * Retrieves all {@code NcmpServiceCmHandle} instances without their associated properties. diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryServiceImpl.java index 84a4533d30..6076895f0f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryServiceImpl.java @@ -48,16 +48,20 @@ import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; import org.onap.cps.ncmp.impl.inventory.models.InventoryQueryConditions; import org.onap.cps.ncmp.impl.inventory.models.PropertyType; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; +import org.onap.cps.ncmp.impl.inventory.trustlevel.TrustLevelManager; import org.onap.cps.ncmp.impl.utils.YangDataConverter; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; @Service @RequiredArgsConstructor public class ParameterizedCmHandleQueryServiceImpl implements ParameterizedCmHandleQueryService { + private static final int FLUX_BUFFER_SIZE = 1000; private static final Collection<String> NO_QUERY_TO_EXECUTE = null; private final CmHandleQueryService cmHandleQueryService; private final InventoryPersistence inventoryPersistence; + private final TrustLevelManager trustLevelManager; @Override public Collection<String> queryCmHandleReferenceIds( @@ -82,24 +86,12 @@ public class ParameterizedCmHandleQueryServiceImpl implements ParameterizedCmHan } @Override - public Collection<NcmpServiceCmHandle> queryCmHandles( - final CmHandleQueryServiceParameters cmHandleQueryServiceParameters) { - - if (cmHandleQueryServiceParameters.getCmHandleQueryParameters().isEmpty()) { - return getAllCmHandles(); - } - - final Collection<String> cmHandleIds = queryCmHandleReferenceIds(cmHandleQueryServiceParameters, false); - + public Flux<NcmpServiceCmHandle> queryCmHandles(final CmHandleQueryServiceParameters queryParameters) { + final Collection<String> cmHandleIds = queryCmHandleReferenceIds(queryParameters, false); return getNcmpServiceCmHandles(cmHandleIds); } @Override - public Collection<NcmpServiceCmHandle> getAllCmHandles() { - return toNcmpServiceCmHandles(inventoryPersistence.getDataNode(NCMP_DMI_REGISTRY_PARENT)); - } - - @Override public Collection<NcmpServiceCmHandle> getAllCmHandlesWithoutProperties() { return toNcmpServiceCmHandles(inventoryPersistence.getDataNode(NCMP_DMI_REGISTRY_PARENT, DIRECT_CHILDREN_ONLY)); } @@ -234,7 +226,14 @@ public class ParameterizedCmHandleQueryServiceImpl implements ParameterizedCmHan return cmHandleQueryService.getAllCmHandleReferences(outputAlternateId); } - private Collection<NcmpServiceCmHandle> getNcmpServiceCmHandles(final Collection<String> cmHandleIds) { + private Flux<NcmpServiceCmHandle> getNcmpServiceCmHandles(final Collection<String> cmHandleIds) { + return Flux.fromIterable(cmHandleIds) + .buffer(FLUX_BUFFER_SIZE) + .map(this::getNcmpServiceCmHandleBatch) + .flatMap(Flux::fromIterable); + } + + private Collection<NcmpServiceCmHandle> getNcmpServiceCmHandleBatch(final Collection<String> cmHandleIds) { final Collection<YangModelCmHandle> yangModelcmHandles = inventoryPersistence.getYangModelCmHandles(cmHandleIds); @@ -243,6 +242,7 @@ public class ParameterizedCmHandleQueryServiceImpl implements ParameterizedCmHan yangModelcmHandles.forEach(yangModelcmHandle -> ncmpServiceCmHandles.add(YangDataConverter.toNcmpServiceCmHandle(yangModelcmHandle)) ); + trustLevelManager.applyEffectiveTrustLevels(ncmpServiceCmHandles); return ncmpServiceCmHandles; } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy index eff8082a0d..29cd92db3f 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021-2024 Nordix Foundation + * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved. * Modifications Copyright (C) 2021 Pantheon.tech * Modifications Copyright (C) 2021-2022 Bell Canada * Modifications Copyright (C) 2023 TechMahindra Ltd. @@ -40,6 +40,7 @@ import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle import org.onap.cps.ncmp.impl.inventory.trustlevel.TrustLevelManager import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher import org.onap.cps.utils.JsonObjectMapper +import reactor.core.publisher.Flux import spock.lang.Specification class NetworkCmProxyInventoryFacadeSpec extends Specification { @@ -249,11 +250,10 @@ class NetworkCmProxyInventoryFacadeSpec extends Specification { and: 'query cm handle method returns two cm handles' mockParameterizedCmHandleQueryService.queryCmHandles( spiedJsonObjectMapper.convertToValueType(cmHandleQueryApiParameters, CmHandleQueryServiceParameters.class)) - >> [new NcmpServiceCmHandle(cmHandleId: 'ch-0'), new NcmpServiceCmHandle(cmHandleId: 'ch-1')] - and: 'a trust level for cm handles' - 1 * mockTrustLevelManager.applyEffectiveTrustLevels(_) >> { args -> args[0].forEach{it.currentTrustLevel = TrustLevel.COMPLETE } } + >> Flux.fromIterable([new NcmpServiceCmHandle(cmHandleId: 'ch-0', currentTrustLevel: TrustLevel.COMPLETE), + new NcmpServiceCmHandle(cmHandleId: 'ch-1', currentTrustLevel: TrustLevel.COMPLETE)]) when: 'execute cm handle search is called' - def result = objectUnderTest.executeCmHandleSearch(cmHandleQueryApiParameters) + def result = objectUnderTest.executeCmHandleSearch(cmHandleQueryApiParameters).collectList().block() then: 'result consists of the two cm handles returned by the CPS Data Service' assert result.size() == 2 assert result[0].cmHandleId == 'ch-0' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryServiceSpec.groovy index 4541576ee9..594d7fb31d 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/ParameterizedCmHandleQueryServiceSpec.groovy @@ -29,6 +29,7 @@ import org.onap.cps.api.exceptions.DataInUseException import org.onap.cps.api.exceptions.DataValidationException import org.onap.cps.api.model.ConditionProperties import org.onap.cps.api.model.DataNode +import org.onap.cps.ncmp.impl.inventory.trustlevel.TrustLevelManager import spock.lang.Specification import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT @@ -38,11 +39,12 @@ class ParameterizedCmHandleQueryServiceSpec extends Specification { def cmHandleQueries = Mock(CmHandleQueryService) def partiallyMockedCmHandleQueries = Spy(CmHandleQueryService) def mockInventoryPersistence = Mock(InventoryPersistence) + def mockTrustLevelManager = Mock(TrustLevelManager) def dmiRegistry = new DataNode(xpath: NCMP_DMI_REGISTRY_PARENT, childDataNodes: createDataNodeList(['PNFDemo1', 'PNFDemo2', 'PNFDemo3', 'PNFDemo4'])) - def objectUnderTest = new ParameterizedCmHandleQueryServiceImpl(cmHandleQueries, mockInventoryPersistence) - def objectUnderTestWithPartiallyMockedQueries = new ParameterizedCmHandleQueryServiceImpl(partiallyMockedCmHandleQueries, mockInventoryPersistence) + def objectUnderTest = new ParameterizedCmHandleQueryServiceImpl(cmHandleQueries, mockInventoryPersistence, mockTrustLevelManager) + def objectUnderTestWithPartiallyMockedQueries = new ParameterizedCmHandleQueryServiceImpl(partiallyMockedCmHandleQueries, mockInventoryPersistence, mockTrustLevelManager) def 'Query cm handle ids with cpsPath.'() { given: 'a cmHandleWithCpsPath condition property' @@ -141,7 +143,7 @@ class ParameterizedCmHandleQueryServiceSpec extends Specification { def conditionProperties = createConditionProperties('hasAllModules', [['moduleName': 'some-module-name']]) cmHandleQueryParameters.setCmHandleQueryParameters([conditionProperties]) when: 'the query is executed for cm handle ids' - def result = objectUnderTest.queryCmHandles(cmHandleQueryParameters) + def result = objectUnderTest.queryCmHandles(cmHandleQueryParameters).collectList().block() then: 'the inventory service is called with the correct module names' 1 * mockInventoryPersistence.getCmHandleReferencesWithGivenModules(['some-module-name'], false) >> ['ch1'] and: 'the inventory service is called with teh correct if and returns a yang model cm handle' @@ -171,10 +173,12 @@ class ParameterizedCmHandleQueryServiceSpec extends Specification { def 'Query cm handle details when the query is empty.'() { given: 'We use an empty query' def cmHandleQueryParameters = new CmHandleQueryServiceParameters() - and: 'the inventory persistence returns the dmi registry datanode with just ids' - mockInventoryPersistence.getDataNode(NCMP_DMI_REGISTRY_PARENT) >> [dmiRegistry] + and: 'the inventory persistence returns the cm handle ids of all cm handles' + cmHandleQueries.getAllCmHandleReferences(false) >> getCmHandleReferencesForDmiRegistry(false) + and: 'the inventory persistence returns the cm handle details when requested' + mockInventoryPersistence.getYangModelCmHandles(_) >> dmiRegistry.childDataNodes.collect { new YangModelCmHandle(id: it.leaves.get("id").toString(), dmiProperties: [], publicProperties: []) } when: 'the query is executed for both cm handle details' - def result = objectUnderTest.queryCmHandles(cmHandleQueryParameters) + def result = objectUnderTest.queryCmHandles(cmHandleQueryParameters).collectList().block() then: 'the correct cm handles are returned' assert result.size() == 4 assert result.cmHandleId.containsAll('PNFDemo1', 'PNFDemo2', 'PNFDemo3', 'PNFDemo4') |