aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java8
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java5
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java21
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java17
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java2
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy7
-rw-r--r--cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy2
-rwxr-xr-xcsit/prepare-csit.sh2
-rw-r--r--csit/pylibs.txt2
-rw-r--r--csit/tests/cps-data-operations/cps-data-operations.robot3
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy16
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy5
-rw-r--r--integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy6
13 files changed, 50 insertions, 46 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java
index 50d96f858c..9cfc49f1d4 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java
@@ -31,15 +31,13 @@ public interface DataJobStatusService {
*
* @param authorization The authorization header from the REST request.
* @param dmiServiceName The name of the DMI Service relevant to the data job.
- * @param requestId The unique identifier for the overall data job request.
- * @param dataProducerJobId The identifier of the data producer job within the DMI system.
* @param dataProducerId The ID of the producer registered by DMI, used for operations related to this request.
* This could include alternate IDs or specific identifiers.
+ * @param dataProducerJobId The identifier of the data producer job within the DMI system.
* @return The current status of the data job as a String.
*/
String getDataJobStatus(final String authorization,
final String dmiServiceName,
- final String requestId,
- final String dataProducerJobId,
- final String dataProducerId);
+ final String dataProducerId,
+ final String dataProducerJobId);
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java
index 0e0498ec49..a7a6573279 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java
@@ -25,17 +25,20 @@ import java.util.Collection;
/**
* Request data for a write operation by the DMI Plugin.
*
+ * @param destination The destination of the results. ( e.g. S3 Bucket)
* @param dataAcceptType Define the data response accept type.
* e.g. "application/vnd.3gpp.object-tree-hierarchical+json",
* "application/vnd.3gpp.object-tree-flat+json" etc.
* @param dataContentType Define the data request content type.
* e.g. "application/3gpp-json-patch+json" etc.
* @param dataProducerId Identifier of the data producer.
- *
+ * @param dataJobId Identifier for the overall Datajob
* @param data A collection of outgoing write operations.
*/
public record SubJobWriteRequest (
+ String destination,
String dataAcceptType,
String dataContentType,
String dataProducerId,
+ String dataJobId,
Collection<DmiWriteOperation> data) {} \ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java
index a6ecaa1097..fb17f066ce 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java
@@ -42,26 +42,25 @@ public class DataJobStatusServiceImpl implements DataJobStatusService {
@Override
public String getDataJobStatus(final String authorization,
final String dmiServiceName,
- final String requestId,
- final String dataProducerJobId,
- final String dataProducerId) {
+ final String dataProducerId,
+ final String dataProducerJobId) {
- final UrlTemplateParameters urlTemplateParameters = buildUrlParameters(dmiServiceName, requestId,
- dataProducerJobId, dataProducerId);
+ final UrlTemplateParameters urlTemplateParameters = buildUrlParameters(dmiServiceName,
+ dataProducerId,
+ dataProducerJobId);
return dmiRestClient.getDataJobStatus(urlTemplateParameters, authorization).block();
}
private UrlTemplateParameters buildUrlParameters(final String dmiServiceName,
- final String requestId,
- final String dataProducerJobId,
- final String dataProducerId) {
+ final String dataProducerId,
+ final String dataProducerJobId) {
return DmiServiceUrlTemplateBuilder.newInstance()
- .fixedPathSegment("dataJob")
- .variablePathSegment("requestId", requestId)
+ .fixedPathSegment("cmwriteJob")
+ .fixedPathSegment("dataProducer")
+ .variablePathSegment("dataProducerId", dataProducerId)
.fixedPathSegment("dataProducerJob")
.variablePathSegment("dataProducerJobId", dataProducerJobId)
.fixedPathSegment("status")
- .queryParameter("dataProducerId", dataProducerId)
.createUrlTemplateParameters(dmiServiceName, dmiProperties.getDmiBasePath());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java
index c93709ce75..0d14dace5e 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java
@@ -64,10 +64,15 @@ public class DmiSubJobRequestHandler {
final Map<ProducerKey, List<DmiWriteOperation>> dmiWriteOperationsPerProducerKey) {
final List<SubJobWriteResponse> subJobWriteResponses = new ArrayList<>(dmiWriteOperationsPerProducerKey.size());
dmiWriteOperationsPerProducerKey.forEach((producerKey, dmi3ggpWriteOperations) -> {
- final SubJobWriteRequest subJobWriteRequest = new SubJobWriteRequest(dataJobMetadata.dataAcceptType(),
- dataJobMetadata.dataContentType(), producerKey.dataProducerIdentifier(), dmi3ggpWriteOperations);
+ final SubJobWriteRequest subJobWriteRequest = new SubJobWriteRequest(dataJobMetadata.destination(),
+ dataJobMetadata.dataAcceptType(),
+ dataJobMetadata.dataContentType(),
+ producerKey.dataProducerIdentifier(),
+ dataJobId,
+ dmi3ggpWriteOperations);
- final UrlTemplateParameters urlTemplateParameters = getUrlTemplateParameters(dataJobId, producerKey);
+ final UrlTemplateParameters urlTemplateParameters = getUrlTemplateParameters(dataJobMetadata.destination(),
+ producerKey);
final ResponseEntity<Object> responseEntity = dmiRestClient.synchronousPostOperationWithJsonData(
RequiredDmiService.DATA,
urlTemplateParameters,
@@ -82,10 +87,10 @@ public class DmiSubJobRequestHandler {
return subJobWriteResponses;
}
- private UrlTemplateParameters getUrlTemplateParameters(final String dataJobId, final ProducerKey producerKey) {
+ private UrlTemplateParameters getUrlTemplateParameters(final String destination, final ProducerKey producerKey) {
return DmiServiceUrlTemplateBuilder.newInstance()
- .fixedPathSegment("writeJob")
- .variablePathSegment("requestId", dataJobId)
+ .fixedPathSegment("cmwriteJob")
+ .queryParameter("destination", destination)
.createUrlTemplateParameters(producerKey.dmiServiceName(), dmiProperties.getDmiBasePath());
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java
index ba6bba9c53..177b4b0bf2 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java
@@ -160,7 +160,7 @@ public class DmiRestClient {
.headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
.retrieve()
.bodyToMono(JsonNode.class)
- .map(responseHealthStatus -> responseHealthStatus.path("status").asText())
+ .map(jsonNode -> jsonNode.path("status").asText())
.onErrorMap(throwable -> handleDmiClientException(throwable, OperationType.READ.getOperationName()));
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy
index cc042988f6..d231dfa755 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy
@@ -39,15 +39,14 @@ class DataJobStatusServiceImplSpec extends Specification {
def 'Forward a data job status query to DMI.' () {
given: 'the required parameters for querying'
def dmiServiceName = 'some-dmi-service'
- def requestId = 'some-request-id'
+ def dataProducerId = 'some-data-producer-id'
def dataProducerJobId = 'some-data-producer-job-id'
- def dataJobId = 'some-data-job-id'
def authorization = 'my authorization header'
- def urlParams = new UrlTemplateParameters('some-dmi-service/dmi/v1/dataJob/{requestId}/dataProducerJob/{dataProducerJobId}/status?dataProducerId={dataProducerId}', ['dataProducerJobId':'some-data-producer-job-id', 'dataProducerId':'some-data-job-id', 'requestId':'some-request-id'])
+ def urlParams = new UrlTemplateParameters('some-dmi-service/dmi/v1/cmwriteJob/dataProducer/{dataProducerId}/dataProducerJob/{dataProducerJobId}/status', ['dataProducerId':'some-data-producer-id', 'dataProducerJobId':'some-data-producer-job-id'])
and: 'the rest client returns a status for the given parameters'
mockDmiRestClient.getDataJobStatus(urlParams, authorization) >> Mono.just('some status')
when: 'the job status is queried'
- def status = objectUnderTest.getDataJobStatus(authorization, dmiServiceName, requestId, dataProducerJobId, dataJobId)
+ def status = objectUnderTest.getDataJobStatus(authorization, dmiServiceName, dataProducerId, dataProducerJobId)
then: 'the status from the rest client is returned'
assert status == 'some status'
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy
index b3dd02dec3..041fbd95ee 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy
@@ -30,7 +30,7 @@ class DmiSubJobRequestHandlerSpec extends Specification {
def authorization = 'my authorization header'
and: 'the dmi rest client will return a response (for the correct parameters)'
def responseEntity = new ResponseEntity<>(new SubJobWriteResponse('my-sub-job-id', 'dmi1', 'prod1'), HttpStatus.OK)
- def expectedJson = '{"dataAcceptType":"t1","dataContentType":"t2","dataProducerId":"prod1","data":[{"path":"p","op":"operation","moduleSetTag":"tag","value":null,"operationId":"o1","privateProperties":{}}]}'
+ def expectedJson = '{"destination":"d1","dataAcceptType":"t1","dataContentType":"t2","dataProducerId":"prod1","dataJobId":"some-job-id","data":[{"path":"p","op":"operation","moduleSetTag":"tag","value":null,"operationId":"o1","privateProperties":{}}]}'
mockDmiRestClient.synchronousPostOperationWithJsonData(RequiredDmiService.DATA, _, expectedJson, OperationType.CREATE, authorization) >> responseEntity
when: 'sending request to DMI invoked'
objectUnderTest.sendRequestsToDmi(authorization, dataJobId, dataJobMetadata, dmiWriteOperationsPerProducerKey)
diff --git a/csit/prepare-csit.sh b/csit/prepare-csit.sh
index fbd5dc5f0d..1b8578e0ce 100755
--- a/csit/prepare-csit.sh
+++ b/csit/prepare-csit.sh
@@ -71,7 +71,7 @@ echo "Versioning information:"
python3 --version
echo "Installing confluent kafka library for robot framework:"
-pip install robotframework-confluentkafkalibrary
+pip install robotframework-confluentkafkalibrary==2.4.0-2
pip freeze
python3 -m robot.run --version || : \ No newline at end of file
diff --git a/csit/pylibs.txt b/csit/pylibs.txt
index 32bfa6faca..3eeb1ab9ec 100644
--- a/csit/pylibs.txt
+++ b/csit/pylibs.txt
@@ -9,7 +9,7 @@ robotframework-requests==0.9.3
robotframework-selenium2library==3.0.0
robotframework-extendedselenium2library
robotframework-sshlibrary
-robotframework-confluentkafkalibrary
+robotframework-confluentkafkalibrary==2.4.0-2
scapy
# Module jsonpath is needed by current AAA idmlite suite.
jsonpath-rw
diff --git a/csit/tests/cps-data-operations/cps-data-operations.robot b/csit/tests/cps-data-operations/cps-data-operations.robot
index 8f1d71aa7b..96212ff632 100644
--- a/csit/tests/cps-data-operations/cps-data-operations.robot
+++ b/csit/tests/cps-data-operations/cps-data-operations.robot
@@ -64,8 +64,7 @@ Consume cloud event from client topic
Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_specversion" "1.0"
Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_type" "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"
Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_correlationid" "${expectedRequestId}"
- # Need to check the root cause of this failure. To be investigated separately as part of CPS-2363
- # Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_source" "DMI"
+ Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_source" "DMI"
END
[Teardown] Basic Teardown ${group_id}
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy
index 5ce2475d7d..56d8f19e64 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy
@@ -20,6 +20,8 @@
package org.onap.cps.integration.base
+import org.onap.cps.ncmp.api.datajobs.models.SubJobWriteRequest
+
import static org.onap.cps.integration.base.CpsIntegrationSpecBase.readResourceDataFile
import groovy.json.JsonSlurper
@@ -91,23 +93,23 @@ class DmiDispatcher extends Dispatcher {
case ~'^/dmi/v1/data$':
return mockResponseWithBody(HttpStatus.ACCEPTED, '{}')
- // get write sub job response
- case ~'^/dmi/v1/writeJob/(.*)$':
- return mockWriteJobResponse(request)
-
// get data job status
- case ~'^/dmi/v1/dataJob/(.*)/dataProducerJob/(.*)/status(.*)$':
+ case ~'^/dmi/v1/cmwriteJob/dataProducer/(.*)/dataProducerJob/(.*)/status$':
return mockResponseWithBody(HttpStatus.OK, '{"status":"status details from mock service"}')
+ // get write sub job response
+ case ~'^/dmi/v1/cmwriteJob(.*)$':
+ return mockWriteJobResponse(request)
+
default:
throw new IllegalArgumentException('Mock DMI does not implement endpoint ' + request.path)
}
}
def mockWriteJobResponse(request) {
- def requestId = Matcher.lastMatcher[0][1]
+ def destination = Matcher.lastMatcher[0][1]
def subJobWriteRequest = jsonSlurper.parseText(request.getBody().readUtf8())
- this.receivedSubJobs.put(requestId, subJobWriteRequest)
+ this.receivedSubJobs.put(destination, subJobWriteRequest)
def response = '{"subJobId":"some sub job id", "dmiServiceName":"some dmi service name", "dataProducerId":"some data producer id"}'
return mockResponseWithBody(HttpStatus.OK, response)
}
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy
index fdcad2b47b..6e5c0e40c2 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy
@@ -12,12 +12,11 @@ class DataJobStatusServiceSpec extends CpsIntegrationSpecBase {
def 'Get the status of a data job from DMI.'() {
given: 'the required data about the data job'
def dmiServiceName = DMI1_URL
- def requestId = 'some-request-id'
- def dataProducerJobId = 'some-data-producer-job-id'
def dataProducerId = 'some-data-producer-id'
+ def dataProducerJobId = 'some-data-producer-job-id'
def authorization = 'my authorization header'
when: 'the data job status checked'
- def result = dataJobStatusService.getDataJobStatus(authorization, dmiServiceName, requestId, dataProducerJobId, dataProducerId)
+ def result = dataJobStatusService.getDataJobStatus(authorization, dmiServiceName, dataProducerId, dataProducerJobId)
then: 'the status is that defined in the mock service.'
assert result == 'status details from mock service'
}
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy
index b73634f40b..834e1399e3 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy
@@ -52,7 +52,7 @@ class WriteSubJobSpec extends CpsIntegrationSpecBase {
given: 'the required input data for the write job'
def authorization = 'my authorization header'
def dataJobWriteRequest = new DataJobWriteRequest([new WriteOperation('p1', '', '', null), new WriteOperation('p2', '', '', null), new WriteOperation('p3', '', '', null)])
- def myDataJobMetadata = new DataJobMetadata('', '', '')
+ def myDataJobMetadata = new DataJobMetadata('d1', '', '')
def dataJobId = 'my-data-job-id'
when: 'sending a write job to NCMP with 2 sub-jobs for DMI 1 and 1 sub-job for DMI 2'
def response = dataJobService.writeDataJob(authorization, dataJobId, myDataJobMetadata, dataJobWriteRequest)
@@ -63,12 +63,12 @@ class WriteSubJobSpec extends CpsIntegrationSpecBase {
assert response[0].dmiServiceName == "some dmi service name"
assert response[0].dataProducerId == "some data producer id"
and: 'dmi 1 received the correct job details'
- def receivedSubJobsForDispatcher1 = dmiDispatcher1.receivedSubJobs['my-data-job-id']['data'].collect()
+ def receivedSubJobsForDispatcher1 = dmiDispatcher1.receivedSubJobs['?destination=d1']['data'].collect()
assert receivedSubJobsForDispatcher1.size() == 2
assert receivedSubJobsForDispatcher1[0]['path'] == 'p1'
assert receivedSubJobsForDispatcher1[1]['path'] == 'p2'
and: 'dmi 2 received the correct job details'
- def receivedSubJobsForDispatcher2 = dmiDispatcher2.receivedSubJobs['my-data-job-id']['data'].collect()
+ def receivedSubJobsForDispatcher2 = dmiDispatcher2.receivedSubJobs['?destination=d1']['data'].collect()
assert receivedSubJobsForDispatcher2.size() == 1
assert receivedSubJobsForDispatcher2[0]['path'] == 'p3'
}