diff options
13 files changed, 50 insertions, 46 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java index 50d96f858c..9cfc49f1d4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java @@ -31,15 +31,13 @@ public interface DataJobStatusService { * * @param authorization The authorization header from the REST request. * @param dmiServiceName The name of the DMI Service relevant to the data job. - * @param requestId The unique identifier for the overall data job request. - * @param dataProducerJobId The identifier of the data producer job within the DMI system. * @param dataProducerId The ID of the producer registered by DMI, used for operations related to this request. * This could include alternate IDs or specific identifiers. + * @param dataProducerJobId The identifier of the data producer job within the DMI system. * @return The current status of the data job as a String. */ String getDataJobStatus(final String authorization, final String dmiServiceName, - final String requestId, - final String dataProducerJobId, - final String dataProducerId); + final String dataProducerId, + final String dataProducerJobId); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java index 0e0498ec49..a7a6573279 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java @@ -25,17 +25,20 @@ import java.util.Collection; /** * Request data for a write operation by the DMI Plugin. * + * @param destination The destination of the results. ( e.g. S3 Bucket) * @param dataAcceptType Define the data response accept type. * e.g. "application/vnd.3gpp.object-tree-hierarchical+json", * "application/vnd.3gpp.object-tree-flat+json" etc. * @param dataContentType Define the data request content type. * e.g. "application/3gpp-json-patch+json" etc. * @param dataProducerId Identifier of the data producer. - * + * @param dataJobId Identifier for the overall Datajob * @param data A collection of outgoing write operations. */ public record SubJobWriteRequest ( + String destination, String dataAcceptType, String dataContentType, String dataProducerId, + String dataJobId, Collection<DmiWriteOperation> data) {}
\ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java index a6ecaa1097..fb17f066ce 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java @@ -42,26 +42,25 @@ public class DataJobStatusServiceImpl implements DataJobStatusService { @Override public String getDataJobStatus(final String authorization, final String dmiServiceName, - final String requestId, - final String dataProducerJobId, - final String dataProducerId) { + final String dataProducerId, + final String dataProducerJobId) { - final UrlTemplateParameters urlTemplateParameters = buildUrlParameters(dmiServiceName, requestId, - dataProducerJobId, dataProducerId); + final UrlTemplateParameters urlTemplateParameters = buildUrlParameters(dmiServiceName, + dataProducerId, + dataProducerJobId); return dmiRestClient.getDataJobStatus(urlTemplateParameters, authorization).block(); } private UrlTemplateParameters buildUrlParameters(final String dmiServiceName, - final String requestId, - final String dataProducerJobId, - final String dataProducerId) { + final String dataProducerId, + final String dataProducerJobId) { return DmiServiceUrlTemplateBuilder.newInstance() - .fixedPathSegment("dataJob") - .variablePathSegment("requestId", requestId) + .fixedPathSegment("cmwriteJob") + .fixedPathSegment("dataProducer") + .variablePathSegment("dataProducerId", dataProducerId) .fixedPathSegment("dataProducerJob") .variablePathSegment("dataProducerJobId", dataProducerJobId) .fixedPathSegment("status") - .queryParameter("dataProducerId", dataProducerId) .createUrlTemplateParameters(dmiServiceName, dmiProperties.getDmiBasePath()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java index c93709ce75..0d14dace5e 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java @@ -64,10 +64,15 @@ public class DmiSubJobRequestHandler { final Map<ProducerKey, List<DmiWriteOperation>> dmiWriteOperationsPerProducerKey) { final List<SubJobWriteResponse> subJobWriteResponses = new ArrayList<>(dmiWriteOperationsPerProducerKey.size()); dmiWriteOperationsPerProducerKey.forEach((producerKey, dmi3ggpWriteOperations) -> { - final SubJobWriteRequest subJobWriteRequest = new SubJobWriteRequest(dataJobMetadata.dataAcceptType(), - dataJobMetadata.dataContentType(), producerKey.dataProducerIdentifier(), dmi3ggpWriteOperations); + final SubJobWriteRequest subJobWriteRequest = new SubJobWriteRequest(dataJobMetadata.destination(), + dataJobMetadata.dataAcceptType(), + dataJobMetadata.dataContentType(), + producerKey.dataProducerIdentifier(), + dataJobId, + dmi3ggpWriteOperations); - final UrlTemplateParameters urlTemplateParameters = getUrlTemplateParameters(dataJobId, producerKey); + final UrlTemplateParameters urlTemplateParameters = getUrlTemplateParameters(dataJobMetadata.destination(), + producerKey); final ResponseEntity<Object> responseEntity = dmiRestClient.synchronousPostOperationWithJsonData( RequiredDmiService.DATA, urlTemplateParameters, @@ -82,10 +87,10 @@ public class DmiSubJobRequestHandler { return subJobWriteResponses; } - private UrlTemplateParameters getUrlTemplateParameters(final String dataJobId, final ProducerKey producerKey) { + private UrlTemplateParameters getUrlTemplateParameters(final String destination, final ProducerKey producerKey) { return DmiServiceUrlTemplateBuilder.newInstance() - .fixedPathSegment("writeJob") - .variablePathSegment("requestId", dataJobId) + .fixedPathSegment("cmwriteJob") + .queryParameter("destination", destination) .createUrlTemplateParameters(producerKey.dmiServiceName(), dmiProperties.getDmiBasePath()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java index ba6bba9c53..177b4b0bf2 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java @@ -160,7 +160,7 @@ public class DmiRestClient { .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization)) .retrieve() .bodyToMono(JsonNode.class) - .map(responseHealthStatus -> responseHealthStatus.path("status").asText()) + .map(jsonNode -> jsonNode.path("status").asText()) .onErrorMap(throwable -> handleDmiClientException(throwable, OperationType.READ.getOperationName())); } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy index cc042988f6..d231dfa755 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy @@ -39,15 +39,14 @@ class DataJobStatusServiceImplSpec extends Specification { def 'Forward a data job status query to DMI.' () { given: 'the required parameters for querying' def dmiServiceName = 'some-dmi-service' - def requestId = 'some-request-id' + def dataProducerId = 'some-data-producer-id' def dataProducerJobId = 'some-data-producer-job-id' - def dataJobId = 'some-data-job-id' def authorization = 'my authorization header' - def urlParams = new UrlTemplateParameters('some-dmi-service/dmi/v1/dataJob/{requestId}/dataProducerJob/{dataProducerJobId}/status?dataProducerId={dataProducerId}', ['dataProducerJobId':'some-data-producer-job-id', 'dataProducerId':'some-data-job-id', 'requestId':'some-request-id']) + def urlParams = new UrlTemplateParameters('some-dmi-service/dmi/v1/cmwriteJob/dataProducer/{dataProducerId}/dataProducerJob/{dataProducerJobId}/status', ['dataProducerId':'some-data-producer-id', 'dataProducerJobId':'some-data-producer-job-id']) and: 'the rest client returns a status for the given parameters' mockDmiRestClient.getDataJobStatus(urlParams, authorization) >> Mono.just('some status') when: 'the job status is queried' - def status = objectUnderTest.getDataJobStatus(authorization, dmiServiceName, requestId, dataProducerJobId, dataJobId) + def status = objectUnderTest.getDataJobStatus(authorization, dmiServiceName, dataProducerId, dataProducerJobId) then: 'the status from the rest client is returned' assert status == 'some status' } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy index b3dd02dec3..041fbd95ee 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy @@ -30,7 +30,7 @@ class DmiSubJobRequestHandlerSpec extends Specification { def authorization = 'my authorization header' and: 'the dmi rest client will return a response (for the correct parameters)' def responseEntity = new ResponseEntity<>(new SubJobWriteResponse('my-sub-job-id', 'dmi1', 'prod1'), HttpStatus.OK) - def expectedJson = '{"dataAcceptType":"t1","dataContentType":"t2","dataProducerId":"prod1","data":[{"path":"p","op":"operation","moduleSetTag":"tag","value":null,"operationId":"o1","privateProperties":{}}]}' + def expectedJson = '{"destination":"d1","dataAcceptType":"t1","dataContentType":"t2","dataProducerId":"prod1","dataJobId":"some-job-id","data":[{"path":"p","op":"operation","moduleSetTag":"tag","value":null,"operationId":"o1","privateProperties":{}}]}' mockDmiRestClient.synchronousPostOperationWithJsonData(RequiredDmiService.DATA, _, expectedJson, OperationType.CREATE, authorization) >> responseEntity when: 'sending request to DMI invoked' objectUnderTest.sendRequestsToDmi(authorization, dataJobId, dataJobMetadata, dmiWriteOperationsPerProducerKey) diff --git a/csit/prepare-csit.sh b/csit/prepare-csit.sh index fbd5dc5f0d..1b8578e0ce 100755 --- a/csit/prepare-csit.sh +++ b/csit/prepare-csit.sh @@ -71,7 +71,7 @@ echo "Versioning information:" python3 --version echo "Installing confluent kafka library for robot framework:" -pip install robotframework-confluentkafkalibrary +pip install robotframework-confluentkafkalibrary==2.4.0-2 pip freeze python3 -m robot.run --version || :
\ No newline at end of file diff --git a/csit/pylibs.txt b/csit/pylibs.txt index 32bfa6faca..3eeb1ab9ec 100644 --- a/csit/pylibs.txt +++ b/csit/pylibs.txt @@ -9,7 +9,7 @@ robotframework-requests==0.9.3 robotframework-selenium2library==3.0.0 robotframework-extendedselenium2library robotframework-sshlibrary -robotframework-confluentkafkalibrary +robotframework-confluentkafkalibrary==2.4.0-2 scapy # Module jsonpath is needed by current AAA idmlite suite. jsonpath-rw diff --git a/csit/tests/cps-data-operations/cps-data-operations.robot b/csit/tests/cps-data-operations/cps-data-operations.robot index 8f1d71aa7b..96212ff632 100644 --- a/csit/tests/cps-data-operations/cps-data-operations.robot +++ b/csit/tests/cps-data-operations/cps-data-operations.robot @@ -64,8 +64,7 @@ Consume cloud event from client topic Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_specversion" "1.0" Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_type" "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent" Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_correlationid" "${expectedRequestId}" - # Need to check the root cause of this failure. To be investigated separately as part of CPS-2363 - # Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_source" "DMI" + Compare Header Values ${header_key_value_pair[0]} ${header_key_value_pair[1]} "ce_source" "DMI" END [Teardown] Basic Teardown ${group_id} diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy index 5ce2475d7d..56d8f19e64 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy @@ -20,6 +20,8 @@ package org.onap.cps.integration.base +import org.onap.cps.ncmp.api.datajobs.models.SubJobWriteRequest + import static org.onap.cps.integration.base.CpsIntegrationSpecBase.readResourceDataFile import groovy.json.JsonSlurper @@ -91,23 +93,23 @@ class DmiDispatcher extends Dispatcher { case ~'^/dmi/v1/data$': return mockResponseWithBody(HttpStatus.ACCEPTED, '{}') - // get write sub job response - case ~'^/dmi/v1/writeJob/(.*)$': - return mockWriteJobResponse(request) - // get data job status - case ~'^/dmi/v1/dataJob/(.*)/dataProducerJob/(.*)/status(.*)$': + case ~'^/dmi/v1/cmwriteJob/dataProducer/(.*)/dataProducerJob/(.*)/status$': return mockResponseWithBody(HttpStatus.OK, '{"status":"status details from mock service"}') + // get write sub job response + case ~'^/dmi/v1/cmwriteJob(.*)$': + return mockWriteJobResponse(request) + default: throw new IllegalArgumentException('Mock DMI does not implement endpoint ' + request.path) } } def mockWriteJobResponse(request) { - def requestId = Matcher.lastMatcher[0][1] + def destination = Matcher.lastMatcher[0][1] def subJobWriteRequest = jsonSlurper.parseText(request.getBody().readUtf8()) - this.receivedSubJobs.put(requestId, subJobWriteRequest) + this.receivedSubJobs.put(destination, subJobWriteRequest) def response = '{"subJobId":"some sub job id", "dmiServiceName":"some dmi service name", "dataProducerId":"some data producer id"}' return mockResponseWithBody(HttpStatus.OK, response) } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy index fdcad2b47b..6e5c0e40c2 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy @@ -12,12 +12,11 @@ class DataJobStatusServiceSpec extends CpsIntegrationSpecBase { def 'Get the status of a data job from DMI.'() { given: 'the required data about the data job' def dmiServiceName = DMI1_URL - def requestId = 'some-request-id' - def dataProducerJobId = 'some-data-producer-job-id' def dataProducerId = 'some-data-producer-id' + def dataProducerJobId = 'some-data-producer-job-id' def authorization = 'my authorization header' when: 'the data job status checked' - def result = dataJobStatusService.getDataJobStatus(authorization, dmiServiceName, requestId, dataProducerJobId, dataProducerId) + def result = dataJobStatusService.getDataJobStatus(authorization, dmiServiceName, dataProducerId, dataProducerJobId) then: 'the status is that defined in the mock service.' assert result == 'status details from mock service' } diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy index b73634f40b..834e1399e3 100644 --- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy +++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy @@ -52,7 +52,7 @@ class WriteSubJobSpec extends CpsIntegrationSpecBase { given: 'the required input data for the write job' def authorization = 'my authorization header' def dataJobWriteRequest = new DataJobWriteRequest([new WriteOperation('p1', '', '', null), new WriteOperation('p2', '', '', null), new WriteOperation('p3', '', '', null)]) - def myDataJobMetadata = new DataJobMetadata('', '', '') + def myDataJobMetadata = new DataJobMetadata('d1', '', '') def dataJobId = 'my-data-job-id' when: 'sending a write job to NCMP with 2 sub-jobs for DMI 1 and 1 sub-job for DMI 2' def response = dataJobService.writeDataJob(authorization, dataJobId, myDataJobMetadata, dataJobWriteRequest) @@ -63,12 +63,12 @@ class WriteSubJobSpec extends CpsIntegrationSpecBase { assert response[0].dmiServiceName == "some dmi service name" assert response[0].dataProducerId == "some data producer id" and: 'dmi 1 received the correct job details' - def receivedSubJobsForDispatcher1 = dmiDispatcher1.receivedSubJobs['my-data-job-id']['data'].collect() + def receivedSubJobsForDispatcher1 = dmiDispatcher1.receivedSubJobs['?destination=d1']['data'].collect() assert receivedSubJobsForDispatcher1.size() == 2 assert receivedSubJobsForDispatcher1[0]['path'] == 'p1' assert receivedSubJobsForDispatcher1[1]['path'] == 'p2' and: 'dmi 2 received the correct job details' - def receivedSubJobsForDispatcher2 = dmiDispatcher2.receivedSubJobs['my-data-job-id']['data'].collect() + def receivedSubJobsForDispatcher2 = dmiDispatcher2.receivedSubJobs['?destination=d1']['data'].collect() assert receivedSubJobsForDispatcher2.size() == 1 assert receivedSubJobsForDispatcher2[0]['path'] == 'p3' } |