diff options
author | 2025-03-21 13:36:47 +0000 | |
---|---|---|
committer | 2025-03-21 13:36:47 +0000 | |
commit | 535a86cc4287a5901b2bbc59beab91eaab31c0a3 (patch) | |
tree | 45ff903062a310c2fd66eaa4293b42826fb4ad51 | |
parent | ba7e941b728592eca44c28d71f048a6c213ef7a5 (diff) | |
parent | bd7e9ceddb20ac179b88e7686da41a32d5b60633 (diff) |
Merge "Implemented parallel execution for writeDataJob using ExecutorService with 10 concurrent threads"
2 files changed, 57 insertions, 12 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ServiceConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ServiceConfig.java index 775e9d7b14..3c98d69554 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ServiceConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ServiceConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2024-2025 Nordix Foundation. + * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ public abstract class ServiceConfig { private String connectionProviderName = ""; private int maximumInMemorySizeInMegabytes = 1; private int maximumConnectionsTotal = 1; - private int pendingAcquireMaxCount = 1; + private int pendingAcquireMaxCount = 10; private Integer connectionTimeoutInSeconds = 1; private long readTimeoutInSeconds = 1; private long writeTimeoutInSeconds = 1; diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/WriteDataJobPerfTest.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/WriteDataJobPerfTest.groovy index c71426032d..de7ffabe5e 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/WriteDataJobPerfTest.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/WriteDataJobPerfTest.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2025 Nordix Foundation + * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the 'License'); * you may not use this file except in compliance with the License. @@ -28,6 +28,8 @@ import org.onap.cps.ncmp.api.datajobs.models.DataJobWriteRequest import org.onap.cps.ncmp.api.datajobs.models.WriteOperation import org.springframework.beans.factory.annotation.Autowired import spock.lang.Ignore +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors /** * This test does not depend on common performance test data. Hence it just extends the integration spec base. @@ -37,8 +39,6 @@ class WriteDataJobPerfTest extends CpsIntegrationSpecBase { @Autowired DataJobService dataJobService - def resourceMeter = new ResourceMeter() - def populateDataJobWriteRequests(int numberOfWriteOperations) { def writeOperations = [] for (int i = 1; i <= numberOfWriteOperations; i++) { @@ -52,16 +52,61 @@ class WriteDataJobPerfTest extends CpsIntegrationSpecBase { @Ignore // CPS-2691 def 'Performance test for writeDataJob method'() { - given: 'register 10_000 cm handles (with alternative ids)' - registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagA', 10_000, 1, ModuleNameStrategy.UNIQUE, { it -> "/SubNetwork=Europe/SubNetwork=Ireland/MeContext=MyRadioNode${it}/ManagedElement=MyManagedElement${it}" }) + given: 'register 10_000 cm handles (with alternate ids)' + registerTestCmHandles(10_000) def dataJobWriteRequest = populateDataJobWriteRequests(10_000) when: 'sending a write job to NCMP with dynamically generated write operations' - resourceMeter.start() - dataJobService.writeDataJob('', '', new DataJobMetadata('d1', '', ''), dataJobWriteRequest) - resourceMeter.stop() + def executionResult = executeWriteJob('d1', dataJobWriteRequest) then: 'record the result. Not asserted, just recorded in See https://lf-onap.atlassian.net/browse/CPS-2691' - println "*** CPS-2691 Execution time: ${resourceMeter.totalTimeInSeconds} seconds" + println "*** CPS-2691 Execution time: ${executionResult.executionTime} seconds | Memory usage: ${executionResult.memoryUsage} MB" + cleanup: 'deregister test cm handles' + deregisterTestCmHandles(10_000) + } + + @Ignore // CPS-2692 + def 'Performance test for writeDataJob method with 10 parallel requests'() { + given: 'register 10_000 cm handles (with alternate ids)' + registerTestCmHandles(1_000) + when: 'sending 10 parallel write jobs to NCMP' + def executionResults = executeParallelWriteJobs(10, 1_000) + then: 'record execution times' + executionResults.eachWithIndex { result, index -> + logExecutionResults("CPS-2692 Job-${index + 1}", result) + } cleanup: 'deregister test cm handles' - deregisterSequenceOfCmHandles(DMI1_URL, 10_000, 1) + deregisterSequenceOfCmHandles(DMI1_URL, 1_000, 1) + } + + def registerTestCmHandles(numberOfCmHandles) { + registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady( + DMI1_URL, "tagA", numberOfCmHandles, 1, ModuleNameStrategy.UNIQUE, + { "/SubNetwork=Europe/SubNetwork=Ireland/MeContext=MyRadioNode${it}/ManagedElement=MyManagedElement${it}" } + ) + } + + def executeParallelWriteJobs(numberOfJobs, numberOfWriteOperations) { + def executorService = Executors.newFixedThreadPool(numberOfJobs) + def futures = (0..<numberOfJobs).collect { jobId -> + CompletableFuture.supplyAsync({ -> executeWriteJob(jobId, populateDataJobWriteRequests(numberOfWriteOperations)) }, executorService) + } + def executionResults = futures.collect { it.join() } + executorService.shutdown() + return executionResults + } + + def executeWriteJob(jobId, dataJobWriteRequest) { + def localMeter = new ResourceMeter() + localMeter.start() + dataJobService.writeDataJob('', '', new DataJobMetadata("job-${jobId}", '', ''), dataJobWriteRequest) + localMeter.stop() + ['executionTime': localMeter.totalTimeInSeconds, 'memoryUsage': localMeter.totalMemoryUsageInMB] + } + + def logExecutionResults(jobId, result) { + println "*** ${jobId} Execution time: ${result.executionTime} seconds | Memory usage: ${result.memoryUsage} MB" + } + + def deregisterTestCmHandles(numberOfCmHandles) { + deregisterSequenceOfCmHandles(DMI1_URL, numberOfCmHandles, 1) } } |