diff options
Diffstat (limited to 'src')
15 files changed, 514 insertions, 302 deletions
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java b/src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java index dbef3477..b4b0249f 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation + * Copyright (C) 2021-2022 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,19 +20,25 @@ package org.onap.cps.ncmp.dmi.exception; -public class ResourceDataNotFound extends DmiException { +import lombok.Getter; +import org.springframework.http.HttpStatus; + +@Getter +public class HttpClientRequestException extends DmiException { private static final long serialVersionUID = 881438585188332404L; - private static final String ERROR_MESSAGE = "Resource data not found for the given cmHandles: "; + private final HttpStatus httpStatus; /** * Constructor. * * @param cmHandle cmHandle identifier - * @param details the error details + * @param details response body from the client available as details + * @param httpStatus http status from the client */ - public ResourceDataNotFound(final String cmHandle, final String details) { - super(ERROR_MESSAGE + cmHandle, details); + public HttpClientRequestException(final String cmHandle, final String details, final HttpStatus httpStatus) { + super("Resource data request failed for CM Handle: " + cmHandle, details); + this.httpStatus = httpStatus; } } diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java new file mode 100644 index 00000000..7189f6c9 --- /dev/null +++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java @@ -0,0 +1,122 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.notifications.async; + +import com.google.gson.JsonObject; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException; +import org.onap.cps.ncmp.dmi.model.DataAccessRequest; +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AsyncTaskExecutor { + + private final DmiAsyncRequestResponseEventProducer dmiAsyncRequestResponseEventProducer; + + private static final DmiAsyncRequestResponseEventCreator dmiAsyncRequestResponseEventCreator = + new DmiAsyncRequestResponseEventCreator(); + + private static final Map<DataAccessRequest.OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6); + + static { + operationToHttpStatusMap.put(null, HttpStatus.OK); + operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.READ, HttpStatus.OK); + operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.CREATE, HttpStatus.CREATED); + operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.PATCH, HttpStatus.OK); + operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.UPDATE, HttpStatus.OK); + operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.DELETE, HttpStatus.NO_CONTENT); + } + + /** + * Execute task asynchronously and publish response to supplied topic. + * + * @param taskSupplier functional method is get() task need to executed asynchronously + * @param topicName topic name where message need to be published + * @param requestId unique requestId for async request + * @param operation the operation performed + * @param timeOutInMilliSeconds task timeout in milliseconds + */ + public void executeAsyncTask(final Supplier<String> taskSupplier, + final String topicName, + final String requestId, + final DataAccessRequest.OperationEnum operation, + final int timeOutInMilliSeconds) { + CompletableFuture.supplyAsync(taskSupplier::get) + .orTimeout(timeOutInMilliSeconds, TimeUnit.MILLISECONDS) + .whenCompleteAsync((resourceDataAsJson, throwable) -> { + if (throwable == null) { + final String status = operationToHttpStatusMap.get(operation).getReasonPhrase(); + final String code = String.valueOf(operationToHttpStatusMap.get(operation).value()); + publishAsyncEvent(topicName, requestId, resourceDataAsJson, status, code); + } else { + log.error("Error occurred with async request {}", throwable.getMessage()); + publishAsyncFailureEvent(topicName, requestId, operation, throwable); + } + }); + log.info("Async task completed."); + } + + private void publishAsyncEvent(final String topicName, + final String requestId, + final String resourceDataAsJson, + final String status, + final String code) { + final DmiAsyncRequestResponseEvent cpsAsyncRequestResponseEvent = + dmiAsyncRequestResponseEventCreator.createEvent(resourceDataAsJson, topicName, requestId, status, code); + + dmiAsyncRequestResponseEventProducer.sendMessage(requestId, cpsAsyncRequestResponseEvent); + } + + protected void publishAsyncFailureEvent(final String topicName, + final String requestId, + final DataAccessRequest.OperationEnum operation, + final Throwable throwable) { + HttpStatus httpStatus = HttpStatus.INTERNAL_SERVER_ERROR; + + if (throwable instanceof HttpClientRequestException) { + final HttpClientRequestException httpClientRequestException = (HttpClientRequestException) throwable; + httpStatus = httpClientRequestException.getHttpStatus(); + } + + final JsonObject errorDetails = new JsonObject(); + errorDetails.addProperty("errorDetails", throwable.getMessage()); + publishAsyncEvent( + topicName, + requestId, + errorDetails.toString(), + httpStatus.getReasonPhrase(), + String.valueOf(httpStatus.value()) + ); + } +} + + + diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java new file mode 100644 index 00000000..de1fc95a --- /dev/null +++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java @@ -0,0 +1,89 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.notifications.async; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.UUID; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.dmi.Application; +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; +import org.onap.cps.ncmp.event.model.EventContent; + +/** + * Helper to create DmiAsyncRequestResponseEvent. + */ +@Slf4j +public class DmiAsyncRequestResponseEventCreator { + + private static final DateTimeFormatter dateTimeFormatter + = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Create an event. + * + * @param resourceDataAsJson the resource data as json + * @param topicParamInQuery the topic to send response to + * @param requestId the request id + * @param status the status of the request + * @param code the code of the response + * + * @return DmiAsyncRequestResponseEvent + */ + public DmiAsyncRequestResponseEvent createEvent(final String resourceDataAsJson, + final String topicParamInQuery, + final String requestId, + final String status, + final String code) { + final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent = new DmiAsyncRequestResponseEvent(); + + dmiAsyncRequestResponseEvent.setEventId(UUID.randomUUID().toString()); + dmiAsyncRequestResponseEvent.setEventCorrelationId(requestId); + dmiAsyncRequestResponseEvent.setEventType(DmiAsyncRequestResponseEvent.class.getName()); + dmiAsyncRequestResponseEvent.setEventSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1"); + dmiAsyncRequestResponseEvent.setEventSource(Application.class.getPackageName()); + dmiAsyncRequestResponseEvent.setEventTarget(topicParamInQuery); + dmiAsyncRequestResponseEvent.setEventTime(ZonedDateTime.now().format(dateTimeFormatter)); + dmiAsyncRequestResponseEvent.setEventContent(getEventContent(resourceDataAsJson, status, code)); + + return dmiAsyncRequestResponseEvent; + } + + @SneakyThrows + private EventContent getEventContent(final String resourceDataAsJson, final String status, final String code) { + final EventContent eventContent = new EventContent(); + + eventContent.setResponseDataSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1"); + eventContent.setResponseStatus(status); + eventContent.setResponseCode(code); + + eventContent.setAdditionalProperty("response-data", + objectMapper.readValue(resourceDataAsJson, HashMap.class)); + + return eventContent; + } + +} diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java index f5e1839b..00fea330 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java @@ -18,27 +18,30 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.dmi.service; +package org.onap.cps.ncmp.dmi.notifications.async; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; +import lombok.RequiredArgsConstructor; +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; -@Slf4j @Service -@AllArgsConstructor -public class NcmpKafkaPublisherService { +@RequiredArgsConstructor +public class DmiAsyncRequestResponseEventProducer { - private final NcmpKafkaPublisher ncmpKafkaPublisher; + private final KafkaTemplate<String, DmiAsyncRequestResponseEvent> kafkaTemplate; + + @Value("${app.ncmp.async.topic}") + private String dmiNcmpTopic; /** - * publish the message to NCMP. + * Sends message to the configured topic with a message key. * - * @param messageKey message key - * @param message message payload + * @param requestId the request id + * @param dmiAsyncRequestResponseEvent the event to publish */ - public void publishToNcmp(final String messageKey, final Object message) { - log.debug("Publishing message : {} to NCMP with message-key : {}", message, messageKey); - ncmpKafkaPublisher.sendMessage(messageKey, message); + public void sendMessage(final String requestId, final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent) { + kafkaTemplate.send(dmiNcmpTopic, requestId, dmiAsyncRequestResponseEvent); } } diff --git a/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java b/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java index 4dbe852d..bdd1fff6 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java @@ -37,11 +37,12 @@ import org.onap.cps.ncmp.dmi.model.ModuleReferencesRequest; import org.onap.cps.ncmp.dmi.model.ModuleResourcesReadRequest; import org.onap.cps.ncmp.dmi.model.ModuleSet; import org.onap.cps.ncmp.dmi.model.YangResources; +import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor; import org.onap.cps.ncmp.dmi.rest.api.DmiPluginApi; import org.onap.cps.ncmp.dmi.rest.api.DmiPluginInternalApi; import org.onap.cps.ncmp.dmi.service.DmiService; -import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService; import org.onap.cps.ncmp.dmi.service.model.ModuleReference; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; @@ -54,13 +55,13 @@ import org.springframework.web.bind.annotation.RestController; public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi { private final DmiService dmiService; - private final ObjectMapper objectMapper; - - private final NcmpKafkaPublisherService ncmpKafkaPublisherService; - + private final AsyncTaskExecutor asyncTaskExecutor; private static final Map<OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6); + @Value("${notification.async.executor.time-out-value-in-ms:2000}") + private int timeOutInMillis; + static { operationToHttpStatusMap.put(null, HttpStatus.OK); operationToHttpStatusMap.put(OperationEnum.READ, HttpStatus.OK); @@ -70,10 +71,9 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi { operationToHttpStatusMap.put(OperationEnum.DELETE, HttpStatus.NO_CONTENT); } - @Override public ResponseEntity<ModuleSet> getModuleReferences(final String cmHandle, - final @Valid ModuleReferencesRequest body) { + final @Valid ModuleReferencesRequest body) { // For onap-dmi-plugin we don't need cmHandleProperties, so DataAccessReadRequest is not used. final ModuleSet moduleSet = dmiService.getModulesForCmHandle(cmHandle); return ResponseEntity.ok(moduleSet); @@ -104,66 +104,119 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi { } /** - * This method fetches the resource for given cm handle using pass through operational. It filters the response on - * the basis of options query parameters and returns response. Does not support write operations. + * This method fetches the resource for given cm handle using pass through operational datastore. It filters the + * response on the basis of options query parameters and returns response. Does not support write operations. * * @param resourceIdentifier resource identifier to fetch data * @param cmHandle cm handle identifier * @param dataAccessRequest data Access Request * @param optionsParamInQuery options query parameter - * @param topicParamInQuery optional topic parameter + * @param topicParamInQuery topic name for (triggering) async responses * @return {@code ResponseEntity} response entity */ @Override public ResponseEntity<Object> dataAccessPassthroughOperational(final String resourceIdentifier, final String cmHandle, - final @Valid DataAccessRequest - dataAccessRequest, + final @Valid DataAccessRequest dataAccessRequest, final @Valid String optionsParamInQuery, final String topicParamInQuery) { if (isReadOperation(dataAccessRequest)) { - final String resourceDataAsJson = dmiService.getResourceData(cmHandle, - resourceIdentifier, - optionsParamInQuery, - DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM); + if (hasTopic(topicParamInQuery)) { + return handleAsyncRequest(resourceIdentifier, cmHandle, dataAccessRequest, optionsParamInQuery, + topicParamInQuery); + } + + final String resourceDataAsJson = dmiService.getResourceData(cmHandle, resourceIdentifier, + optionsParamInQuery, DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM); return ResponseEntity.ok(resourceDataAsJson); } return new ResponseEntity<>(HttpStatus.BAD_REQUEST); } + /** + * This method fetches the resource for given cm handle using pass through running datastore. It filters the + * response on the basis of options query parameters and returns response. It supports both read and write + * operation. + * + * @param resourceIdentifier resource identifier to fetch data + * @param cmHandle cm handle identifier + * @param dataAccessRequest data Access Request + * @param optionsParamInQuery options query parameter + * @param topicParamInQuery topic name for (triggering) async responses + * @return {@code ResponseEntity} response entity + */ @Override public ResponseEntity<Object> dataAccessPassthroughRunning(final String resourceIdentifier, final String cmHandle, - final @Valid DataAccessRequest - dataAccessRequest, + final @Valid DataAccessRequest dataAccessRequest, final @Valid String optionsParamInQuery, final String topicParamInQuery) { - final String sdncResponse; - if (isReadOperation(dataAccessRequest)) { - sdncResponse = dmiService.getResourceData(cmHandle, - resourceIdentifier, - optionsParamInQuery, - DmiService.RESTCONF_CONTENT_PASSTHROUGH_RUNNING_QUERY_PARAM); - } else { - sdncResponse = dmiService.writeData( + if (hasTopic(topicParamInQuery)) { + asyncTaskExecutor.executeAsyncTask(() -> + getSdncResponseForPassThroughRunning( + resourceIdentifier, + cmHandle, + dataAccessRequest, + optionsParamInQuery), + topicParamInQuery, + dataAccessRequest.getRequestId(), dataAccessRequest.getOperation(), - cmHandle, - resourceIdentifier, - dataAccessRequest.getDataType(), - dataAccessRequest.getData()); + timeOutInMillis + ); + return new ResponseEntity<>(HttpStatus.NO_CONTENT); } + + final String sdncResponse = + getSdncResponseForPassThroughRunning(resourceIdentifier, cmHandle, dataAccessRequest, optionsParamInQuery); return new ResponseEntity<>(sdncResponse, operationToHttpStatusMap.get(dataAccessRequest.getOperation())); } + private String getSdncResponseForPassThroughRunning(final String resourceIdentifier, + final String cmHandle, + final DataAccessRequest dataAccessRequest, + final String optionsParamInQuery) { + if (isReadOperation(dataAccessRequest)) { + return dmiService.getResourceData(cmHandle, resourceIdentifier, optionsParamInQuery, + DmiService.RESTCONF_CONTENT_PASSTHROUGH_RUNNING_QUERY_PARAM); + } + + return dmiService.writeData(dataAccessRequest.getOperation(), cmHandle, resourceIdentifier, + dataAccessRequest.getDataType(), dataAccessRequest.getData()); + } + private boolean isReadOperation(final @Valid DataAccessRequest dataAccessRequest) { return dataAccessRequest.getOperation() == null || dataAccessRequest.getOperation().equals(DataAccessRequest.OperationEnum.READ); } private List<ModuleReference> convertRestObjectToJavaApiObject( - final ModuleResourcesReadRequest moduleResourcesReadRequest) { + final ModuleResourcesReadRequest moduleResourcesReadRequest) { return objectMapper .convertValue(moduleResourcesReadRequest.getData().getModules(), - new TypeReference<List<ModuleReference>>() {}); + new TypeReference<List<ModuleReference>>() {}); + } + + private boolean hasTopic(final String topicParamInQuery) { + return !(topicParamInQuery == null || topicParamInQuery.isBlank()); } + + private ResponseEntity<Object> handleAsyncRequest(final String resourceIdentifier, + final String cmHandle, + final DataAccessRequest dataAccessRequest, + final String optionsParamInQuery, + final String topicParamInQuery) { + asyncTaskExecutor.executeAsyncTask(() -> + dmiService.getResourceData( + cmHandle, + resourceIdentifier, + optionsParamInQuery, + DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM), + topicParamInQuery, + dataAccessRequest.getRequestId(), + dataAccessRequest.getOperation(), + timeOutInMillis + ); + return new ResponseEntity<>(HttpStatus.NO_CONTENT); + } + } diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java b/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java index 22d47442..753d16f7 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java @@ -33,9 +33,9 @@ import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.dmi.config.DmiPluginConfig.DmiPluginProperties; import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException; import org.onap.cps.ncmp.dmi.exception.DmiException; +import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException; import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException; import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException; -import org.onap.cps.ncmp.dmi.exception.ResourceDataNotFound; import org.onap.cps.ncmp.dmi.model.DataAccessRequest; import org.onap.cps.ncmp.dmi.model.ModuleSet; import org.onap.cps.ncmp.dmi.model.ModuleSetSchemas; @@ -59,8 +59,6 @@ public class DmiServiceImpl implements DmiService { private NcmpRestClient ncmpRestClient; private ObjectMapper objectMapper; private DmiPluginProperties dmiPluginProperties; - private static final String RESPONSE_CODE = "response code : "; - private static final String MESSAGE = " message : "; /** * Constructor. @@ -107,8 +105,8 @@ public class DmiServiceImpl implements DmiService { "SDNC did not return a module resource for the given cmHandle."); } else { log.error("Error occurred when getting module resources from SDNC for the given cmHandle {}", cmHandle); - throw new DmiException(cmHandle, - RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody()); + throw new HttpClientRequestException( + cmHandle, responseEntity.getBody(), responseEntity.getStatusCode()); } } return yangResources; @@ -166,20 +164,14 @@ public class DmiServiceImpl implements DmiService { final String dataType, final String data) { final ResponseEntity<String> responseEntity = sdncOperations.writeData(operation, cmHandle, resourceIdentifier, dataType, data); - if (responseEntity.getStatusCode().is2xxSuccessful()) { - return responseEntity.getBody(); - } else { - throw new DmiException(cmHandle, - RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody()); - } + return prepareAndSendResponse(responseEntity, cmHandle); } private String prepareAndSendResponse(final ResponseEntity<String> responseEntity, final String cmHandle) { - if (responseEntity.getStatusCode() == HttpStatus.OK) { + if (responseEntity.getStatusCode().is2xxSuccessful()) { return responseEntity.getBody(); } else { - throw new ResourceDataNotFound(cmHandle, - RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody()); + throw new HttpClientRequestException(cmHandle, responseEntity.getBody(), responseEntity.getStatusCode()); } } diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java deleted file mode 100644 index 373a09d7..00000000 --- a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.dmi.service; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import org.springframework.stereotype.Component; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; - -@Component -@Slf4j -public class NcmpKafkaPublisher { - - private final KafkaTemplate<String, Object> kafkaTemplate; - private final String topicName; - - /** - * KafkaTemplate and Topic name. - * - * @param kafkaTemplate kafka template - * @param topicName topic name - */ - @Autowired - public NcmpKafkaPublisher(final KafkaTemplate<String, Object> kafkaTemplate, - @Value("${app.ncmp.async-m2m.topic}") final String topicName) { - this.kafkaTemplate = kafkaTemplate; - this.topicName = topicName; - } - - /** - * Sends message to the configured topic with a message key. - * - * @param messageKey message key - * @param payload message payload - */ - public void sendMessage(final String messageKey, final Object payload) { - final ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topicName, messageKey, payload); - send.addCallback(new ListenableFutureCallback<>() { - @Override - public void onFailure(final Throwable ex) { - log.warn("Failed to send the messages {}", ex.getMessage()); - } - - @Override - public void onSuccess(final SendResult<String, Object> result) { - log.debug("Sent message {}", result.getProducerRecord()); - } - }); - } -} diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java b/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java index cf7b4599..179707ab 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation + * Copyright (C) 2021-2022 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,9 +69,10 @@ public class SdncRestconfClient { * @param httpHeaders HTTP Headers * @return response entity */ - public ResponseEntity<String> httpOperationWithJsonData(final HttpMethod httpMethod, final String resourceUrl, - final String jsonData, - final HttpHeaders httpHeaders) { + public ResponseEntity<String> httpOperationWithJsonData(final HttpMethod httpMethod, + final String resourceUrl, + final String jsonData, + final HttpHeaders httpHeaders) { final String sdncBaseUrl = sdncProperties.getBaseUrl(); final String sdncRestconfUrl = sdncBaseUrl.concat(resourceUrl); httpHeaders.setBasicAuth(sdncProperties.getAuthUsername(), sdncProperties.getAuthPassword()); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8be97d2d..6ad9d58d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -45,19 +45,24 @@ spring: pathmatch: matching-strategy: ANT_PATH_MATCHER kafka: - bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER} + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER:localhost:9092} security: protocol: PLAINTEXT producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer - client-id: dmi-plugin + client-id: ncmp-dmi-plugin app: ncmp: - async-m2m: + async: topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m} +notification: + async: + executor: + time-out-value-in-ms: 2000 + # Actuator management: server: @@ -106,4 +111,4 @@ springdoc: urlsPrimaryName: query urls: - name: query - url: /api-docs/openapi.yaml
\ No newline at end of file + url: /api-docs/openapi.yaml diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy new file mode 100644 index 00000000..54c0fe09 --- /dev/null +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy @@ -0,0 +1,136 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the 'License'); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.notifications.async + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException +import org.onap.cps.ncmp.dmi.model.DataAccessRequest +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent +import org.spockframework.spring.SpringBean +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.http.HttpStatus +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonSerializer +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.spock.Testcontainers +import org.testcontainers.utility.DockerImageName +import spock.lang.Specification + +import java.time.Duration + +@SpringBootTest(classes = [AsyncTaskExecutor, DmiAsyncRequestResponseEventProducer]) +@Testcontainers +@DirtiesContext +class AsyncTaskExecutorIntegrationSpec extends Specification { + + static kafkaTestContainer = new KafkaContainer( + DockerImageName.parse('confluentinc/cp-kafka:6.2.1') + ) + + static { + Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) + } + + def setupSpec() { + kafkaTestContainer.start() + } + + def producerConfigProperties = [ + 'bootstrap.servers' : kafkaTestContainer.getBootstrapServers().split(',')[0], + 'retries' : 0, + 'batch.size' : 16384, + 'linger.ms' : 1, + 'buffer.memory' : 33554432, + 'key.serializer' : StringSerializer, + 'value.serializer' : JsonSerializer + ] + + def consumerConfigProperties = [ + 'bootstrap.servers' : kafkaTestContainer.getBootstrapServers().split(',')[0], + 'key.deserializer' : StringDeserializer, + 'value.deserializer': StringDeserializer, + 'auto.offset.reset' : 'earliest', + 'group.id' : 'test' + ] + + def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties)) + + @SpringBean + DmiAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducer = + new DmiAsyncRequestResponseEventProducer(kafkaTemplate) + + KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(consumerConfigProperties) + + def spiedObjectMapper = Spy(ObjectMapper) + + def objectUnderTest = new AsyncTaskExecutor(cpsAsyncRequestResponseEventProducer) + + private static final String TEST_TOPIC = 'test-topic' + + def setup() { + cpsAsyncRequestResponseEventProducer.dmiNcmpTopic = TEST_TOPIC + consumer.subscribe([TEST_TOPIC] as List<String>) + } + + def cleanup() { + consumer.close() + } + + def 'Publish and Subscribe message - success'() { + when: 'a successful event is published' + objectUnderTest.publishAsyncEvent(TEST_TOPIC, '12345','{}', 'OK', '200') + and: 'the topic is polled' + def records = consumer.poll(Duration.ofMillis(1500)) + then: 'the record received is the event sent' + def record = records.iterator().next() + DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent) + and: 'the status & code matches expected' + assert event.getEventContent().getResponseStatus() == 'OK' + assert event.getEventContent().getResponseCode() == '200' + } + + def 'Publish and Subscribe message - failure'() { + when: 'a failure event is published' + def exception = new HttpClientRequestException('some cm handle', 'Node not found', HttpStatus.INTERNAL_SERVER_ERROR) + objectUnderTest.publishAsyncFailureEvent(TEST_TOPIC, '67890', DataAccessRequest.OperationEnum.READ, exception) + and: 'the topic is polled' + def records = consumer.poll(Duration.ofMillis(1500)) + then: 'the record received is the event sent' + def record = records.iterator().next() + DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent) + and: 'the status & code matches expected' + assert event.getEventContent().getResponseStatus() == 'Internal Server Error' + assert event.getEventContent().getResponseCode() == '500' + } + + @DynamicPropertySource + static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { + dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) + } + +}
\ No newline at end of file diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy index 1541f8ca..5bfbc400 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy @@ -21,11 +21,14 @@ package org.onap.cps.ncmp.dmi.rest.controller + import org.onap.cps.ncmp.dmi.TestUtils import org.onap.cps.ncmp.dmi.exception.DmiException import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException -import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService +import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor +import org.onap.cps.ncmp.dmi.notifications.async.DmiAsyncRequestResponseEventProducer + import org.onap.cps.ncmp.dmi.service.model.ModuleReference import org.onap.cps.ncmp.dmi.model.ModuleSet import org.onap.cps.ncmp.dmi.model.ModuleSetSchemas @@ -38,6 +41,7 @@ import org.springframework.beans.factory.annotation.Value import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest import org.springframework.http.HttpStatus import org.springframework.http.MediaType +import org.springframework.kafka.core.KafkaTemplate import org.springframework.security.test.context.support.WithMockUser import org.springframework.test.web.servlet.MockMvc import spock.lang.Specification @@ -53,7 +57,7 @@ import static org.onap.cps.ncmp.dmi.model.DataAccessRequest.OperationEnum.UPDATE import static org.springframework.http.HttpStatus.CREATED import static org.springframework.http.HttpStatus.OK -@WebMvcTest(DmiRestController) +@WebMvcTest(DmiRestController.class) @WithMockUser class DmiRestControllerSpec extends Specification { @@ -64,7 +68,10 @@ class DmiRestControllerSpec extends Specification { DmiService mockDmiService = Mock() @SpringBean - NcmpKafkaPublisherService mockNcmpKafkaPublisherService = Mock() + DmiAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducer = new DmiAsyncRequestResponseEventProducer(Mock(KafkaTemplate)) + + @SpringBean + AsyncTaskExecutor asyncTaskExecutor = new AsyncTaskExecutor(cpsAsyncRequestResponseEventProducer) @Value('${rest.api.dmi-base-path}/v1') def basePathV1 @@ -256,6 +263,21 @@ class DmiRestControllerSpec extends Specification { response.getContentAsString() == '{some-json}' } + def 'PassThrough Returns OK when topic is used for async'(){ + given: 'an endpoint' + def readPassThroughUrl ="${basePathV1}/ch/some-cmHandle/data/ds/ncmp-datastore:" + + resourceIdentifier + + '?resourceIdentifier=some-resourceIdentifier&topic=test-topic' + when: 'endpoint is invoked' + def jsonData = TestUtils.getResourceFileContent('readData.json') + def response = mvc.perform( + post(readPassThroughUrl).contentType(MediaType.APPLICATION_JSON).content(jsonData) + ).andReturn().response + then: 'response status is OK' + assert response.status == HttpStatus.NO_CONTENT.value() + where: 'the following values are used' + resourceIdentifier << ['passthrough-operational', 'passthrough-running'] + } def 'Get resource data for pass-through running with #scenario value in resource identifier param.'() { given: 'Get resource data url' diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy index e38d5c37..1d87b775 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy @@ -29,7 +29,7 @@ import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException import org.onap.cps.ncmp.dmi.exception.DmiException import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException -import org.onap.cps.ncmp.dmi.exception.ResourceDataNotFound +import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException import org.onap.cps.ncmp.dmi.service.model.ModuleReference import org.onap.cps.ncmp.dmi.model.YangResource import org.onap.cps.ncmp.dmi.model.YangResources @@ -221,7 +221,7 @@ class DmiServiceImplSpec extends Specification { objectUnderTest.getResourceData(cmHandle, resourceId, optionsParam, restConfQueryParam) then: 'resource data not found' - thrown(ResourceDataNotFound.class) + thrown(HttpClientRequestException.class) } def 'Get resource data for passthrough running.'() { diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy deleted file mode 100644 index f5bc4ac4..00000000 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy +++ /dev/null @@ -1,41 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.dmi.service - -import spock.lang.Specification - -class NcmpKafkaPublisherServiceSpec extends Specification { - - def mockNcmpKafkaPublisher = Mock(NcmpKafkaPublisher) - def objectUnderTest = new NcmpKafkaPublisherService(mockNcmpKafkaPublisher) - - def 'Message publishing'() { - given: 'a sample message with key' - def message = 'sample message' - def messageKey = 'sample-key' - when: 'published' - objectUnderTest.publishToNcmp(messageKey, message) - then: 'no exception is thrown' - noExceptionThrown() - and: 'message is published once' - 1 * mockNcmpKafkaPublisher.sendMessage(messageKey, message) - } -} diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy deleted file mode 100644 index 00c8e6e7..00000000 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy +++ /dev/null @@ -1,107 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.dmi.service - -import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.serialization.StringDeserializer -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.beans.factory.annotation.Value -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.support.serializer.JsonDeserializer -import org.springframework.test.annotation.DirtiesContext -import org.springframework.test.context.DynamicPropertyRegistry -import org.springframework.test.context.DynamicPropertySource -import org.testcontainers.containers.KafkaContainer -import org.testcontainers.spock.Testcontainers -import spock.lang.Specification - -import java.time.Duration - -import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - -@SpringBootTest -@Testcontainers -@DirtiesContext -class NcmpKafkaPublisherSpec extends Specification { - - static kafkaTestContainer = new KafkaContainer() - static { - Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) - } - - def setupSpec() { - kafkaTestContainer.start() - } - - @Autowired - KafkaTemplate<String, Object> kafkaTemplate - - @Value('${app.ncmp.async-m2m.topic}') - String topic - - KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(kafkaConsumerConfig()) - - def 'Publish and Subscribe message'() { - given: 'a sample messsage and key' - def message = 'sample message' - def messageKey = 'message-key' - def objectUnderTest = new NcmpKafkaPublisher(kafkaTemplate, topic) - when: 'a message is published' - objectUnderTest.sendMessage(messageKey, message) - then: 'a message is consumed' - consumer.subscribe([topic] as List<String>) - def records = consumer.poll(Duration.ofMillis(1000)) - assert records.size() == 1 - assert messageKey == records[0].key - assert message == records[0].value - } - - def kafkaConsumerConfig() { - def configs = [:] - configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.name) - configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.name) - configs.put(AUTO_OFFSET_RESET_CONFIG, 'earliest') - configs.put(BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers().split(",")[0]) - configs.put(GROUP_ID_CONFIG, 'test') - return configs - } - - @DynamicPropertySource - static void registerKafkaProperties(DynamicPropertyRegistry registry) { - registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) - } -} - -@Configuration -class TopicConfig { - @Bean - NewTopic newTopic() { - return new NewTopic("my-topic-name", 1, (short) 1); - } -} diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 0d3784f9..9ed37a73 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -47,19 +47,22 @@ dmi: spring: application: name: ncmp-dmi-plugin + mvc: + pathmatch: + matching-strategy: ANT_PATH_MATCHER kafka: - bootstrap-servers: localhost:9092 + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER} security: protocol: PLAINTEXT producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer - client-id: dmi-plugin + client-id: ncmp-dmi-plugin app: ncmp: - async-m2m: - topic: my-topic-name + async: + topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m} logging: format: json |