aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniel Hanrahan <daniel.hanrahan@est.tech>2025-03-21 13:36:47 +0000
committerGerrit Code Review <gerrit@onap.org>2025-03-21 13:36:47 +0000
commit535a86cc4287a5901b2bbc59beab91eaab31c0a3 (patch)
tree45ff903062a310c2fd66eaa4293b42826fb4ad51
parentba7e941b728592eca44c28d71f048a6c213ef7a5 (diff)
parentbd7e9ceddb20ac179b88e7686da41a32d5b60633 (diff)
Merge "Implemented parallel execution for writeDataJob using ExecutorService with 10 concurrent threads"
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ServiceConfig.java4
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/WriteDataJobPerfTest.groovy65
2 files changed, 57 insertions, 12 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ServiceConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ServiceConfig.java
index 775e9d7b14..3c98d69554 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ServiceConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/ServiceConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 Nordix Foundation.
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,7 +29,7 @@ public abstract class ServiceConfig {
private String connectionProviderName = "";
private int maximumInMemorySizeInMegabytes = 1;
private int maximumConnectionsTotal = 1;
- private int pendingAcquireMaxCount = 1;
+ private int pendingAcquireMaxCount = 10;
private Integer connectionTimeoutInSeconds = 1;
private long readTimeoutInSeconds = 1;
private long writeTimeoutInSeconds = 1;
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/WriteDataJobPerfTest.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/WriteDataJobPerfTest.groovy
index c71426032d..de7ffabe5e 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/WriteDataJobPerfTest.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/performance/ncmp/WriteDataJobPerfTest.groovy
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2025 Nordix Foundation
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the 'License');
* you may not use this file except in compliance with the License.
@@ -28,6 +28,8 @@ import org.onap.cps.ncmp.api.datajobs.models.DataJobWriteRequest
import org.onap.cps.ncmp.api.datajobs.models.WriteOperation
import org.springframework.beans.factory.annotation.Autowired
import spock.lang.Ignore
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.Executors
/**
* This test does not depend on common performance test data. Hence it just extends the integration spec base.
@@ -37,8 +39,6 @@ class WriteDataJobPerfTest extends CpsIntegrationSpecBase {
@Autowired
DataJobService dataJobService
- def resourceMeter = new ResourceMeter()
-
def populateDataJobWriteRequests(int numberOfWriteOperations) {
def writeOperations = []
for (int i = 1; i <= numberOfWriteOperations; i++) {
@@ -52,16 +52,61 @@ class WriteDataJobPerfTest extends CpsIntegrationSpecBase {
@Ignore // CPS-2691
def 'Performance test for writeDataJob method'() {
- given: 'register 10_000 cm handles (with alternative ids)'
- registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(DMI1_URL, 'tagA', 10_000, 1, ModuleNameStrategy.UNIQUE, { it -> "/SubNetwork=Europe/SubNetwork=Ireland/MeContext=MyRadioNode${it}/ManagedElement=MyManagedElement${it}" })
+ given: 'register 10_000 cm handles (with alternate ids)'
+ registerTestCmHandles(10_000)
def dataJobWriteRequest = populateDataJobWriteRequests(10_000)
when: 'sending a write job to NCMP with dynamically generated write operations'
- resourceMeter.start()
- dataJobService.writeDataJob('', '', new DataJobMetadata('d1', '', ''), dataJobWriteRequest)
- resourceMeter.stop()
+ def executionResult = executeWriteJob('d1', dataJobWriteRequest)
then: 'record the result. Not asserted, just recorded in See https://lf-onap.atlassian.net/browse/CPS-2691'
- println "*** CPS-2691 Execution time: ${resourceMeter.totalTimeInSeconds} seconds"
+ println "*** CPS-2691 Execution time: ${executionResult.executionTime} seconds | Memory usage: ${executionResult.memoryUsage} MB"
+ cleanup: 'deregister test cm handles'
+ deregisterTestCmHandles(10_000)
+ }
+
+ @Ignore // CPS-2692
+ def 'Performance test for writeDataJob method with 10 parallel requests'() {
+ given: 'register 10_000 cm handles (with alternate ids)'
+ registerTestCmHandles(1_000)
+ when: 'sending 10 parallel write jobs to NCMP'
+ def executionResults = executeParallelWriteJobs(10, 1_000)
+ then: 'record execution times'
+ executionResults.eachWithIndex { result, index ->
+ logExecutionResults("CPS-2692 Job-${index + 1}", result)
+ }
cleanup: 'deregister test cm handles'
- deregisterSequenceOfCmHandles(DMI1_URL, 10_000, 1)
+ deregisterSequenceOfCmHandles(DMI1_URL, 1_000, 1)
+ }
+
+ def registerTestCmHandles(numberOfCmHandles) {
+ registerSequenceOfCmHandlesWithManyModuleReferencesButDoNotWaitForReady(
+ DMI1_URL, "tagA", numberOfCmHandles, 1, ModuleNameStrategy.UNIQUE,
+ { "/SubNetwork=Europe/SubNetwork=Ireland/MeContext=MyRadioNode${it}/ManagedElement=MyManagedElement${it}" }
+ )
+ }
+
+ def executeParallelWriteJobs(numberOfJobs, numberOfWriteOperations) {
+ def executorService = Executors.newFixedThreadPool(numberOfJobs)
+ def futures = (0..<numberOfJobs).collect { jobId ->
+ CompletableFuture.supplyAsync({ -> executeWriteJob(jobId, populateDataJobWriteRequests(numberOfWriteOperations)) }, executorService)
+ }
+ def executionResults = futures.collect { it.join() }
+ executorService.shutdown()
+ return executionResults
+ }
+
+ def executeWriteJob(jobId, dataJobWriteRequest) {
+ def localMeter = new ResourceMeter()
+ localMeter.start()
+ dataJobService.writeDataJob('', '', new DataJobMetadata("job-${jobId}", '', ''), dataJobWriteRequest)
+ localMeter.stop()
+ ['executionTime': localMeter.totalTimeInSeconds, 'memoryUsage': localMeter.totalMemoryUsageInMB]
+ }
+
+ def logExecutionResults(jobId, result) {
+ println "*** ${jobId} Execution time: ${result.executionTime} seconds | Memory usage: ${result.memoryUsage} MB"
+ }
+
+ def deregisterTestCmHandles(numberOfCmHandles) {
+ deregisterSequenceOfCmHandles(DMI1_URL, numberOfCmHandles, 1)
}
}