diff options
author | JosephKeenan <joseph.keenan@est.tech> | 2022-05-23 15:43:05 +0100 |
---|---|---|
committer | JosephKeenan <joseph.keenan@est.tech> | 2022-06-01 16:56:50 +0100 |
commit | 2cd8b98223bd49975fcca0ec7f1d4673a4163074 (patch) | |
tree | d26e809ad7befaf8b7815271a6fcc501f67b3117 /src/main | |
parent | df26bc38a75f10650ce5785cdc9bd7b9516f6f25 (diff) |
Async request response dmi -> NCMP
-Added Async for passthrough running and operational
-Build will fail until cps is merged https://gerrit.onap.org/r/c/cps/+/128685
Issue-ID: CPS-830
Change-Id: Iedbfab109f5cd777a5be8eed7414758d0f5ec05c
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java (renamed from src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java) | 18 | ||||
-rw-r--r-- | src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java | 122 | ||||
-rw-r--r-- | src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java | 89 | ||||
-rw-r--r-- | src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java (renamed from src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java) | 29 | ||||
-rw-r--r-- | src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java | 117 | ||||
-rw-r--r-- | src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java | 20 | ||||
-rw-r--r-- | src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java | 72 | ||||
-rw-r--r-- | src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java | 9 | ||||
-rw-r--r-- | src/main/resources/application.yml | 13 |
9 files changed, 344 insertions, 145 deletions
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java b/src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java index dbef3477..b4b0249f 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation + * Copyright (C) 2021-2022 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,19 +20,25 @@ package org.onap.cps.ncmp.dmi.exception; -public class ResourceDataNotFound extends DmiException { +import lombok.Getter; +import org.springframework.http.HttpStatus; + +@Getter +public class HttpClientRequestException extends DmiException { private static final long serialVersionUID = 881438585188332404L; - private static final String ERROR_MESSAGE = "Resource data not found for the given cmHandles: "; + private final HttpStatus httpStatus; /** * Constructor. * * @param cmHandle cmHandle identifier - * @param details the error details + * @param details response body from the client available as details + * @param httpStatus http status from the client */ - public ResourceDataNotFound(final String cmHandle, final String details) { - super(ERROR_MESSAGE + cmHandle, details); + public HttpClientRequestException(final String cmHandle, final String details, final HttpStatus httpStatus) { + super("Resource data request failed for CM Handle: " + cmHandle, details); + this.httpStatus = httpStatus; } } diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java new file mode 100644 index 00000000..7189f6c9 --- /dev/null +++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java @@ -0,0 +1,122 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.notifications.async; + +import com.google.gson.JsonObject; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException; +import org.onap.cps.ncmp.dmi.model.DataAccessRequest; +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AsyncTaskExecutor { + + private final DmiAsyncRequestResponseEventProducer dmiAsyncRequestResponseEventProducer; + + private static final DmiAsyncRequestResponseEventCreator dmiAsyncRequestResponseEventCreator = + new DmiAsyncRequestResponseEventCreator(); + + private static final Map<DataAccessRequest.OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6); + + static { + operationToHttpStatusMap.put(null, HttpStatus.OK); + operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.READ, HttpStatus.OK); + operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.CREATE, HttpStatus.CREATED); + operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.PATCH, HttpStatus.OK); + operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.UPDATE, HttpStatus.OK); + operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.DELETE, HttpStatus.NO_CONTENT); + } + + /** + * Execute task asynchronously and publish response to supplied topic. + * + * @param taskSupplier functional method is get() task need to executed asynchronously + * @param topicName topic name where message need to be published + * @param requestId unique requestId for async request + * @param operation the operation performed + * @param timeOutInMilliSeconds task timeout in milliseconds + */ + public void executeAsyncTask(final Supplier<String> taskSupplier, + final String topicName, + final String requestId, + final DataAccessRequest.OperationEnum operation, + final int timeOutInMilliSeconds) { + CompletableFuture.supplyAsync(taskSupplier::get) + .orTimeout(timeOutInMilliSeconds, TimeUnit.MILLISECONDS) + .whenCompleteAsync((resourceDataAsJson, throwable) -> { + if (throwable == null) { + final String status = operationToHttpStatusMap.get(operation).getReasonPhrase(); + final String code = String.valueOf(operationToHttpStatusMap.get(operation).value()); + publishAsyncEvent(topicName, requestId, resourceDataAsJson, status, code); + } else { + log.error("Error occurred with async request {}", throwable.getMessage()); + publishAsyncFailureEvent(topicName, requestId, operation, throwable); + } + }); + log.info("Async task completed."); + } + + private void publishAsyncEvent(final String topicName, + final String requestId, + final String resourceDataAsJson, + final String status, + final String code) { + final DmiAsyncRequestResponseEvent cpsAsyncRequestResponseEvent = + dmiAsyncRequestResponseEventCreator.createEvent(resourceDataAsJson, topicName, requestId, status, code); + + dmiAsyncRequestResponseEventProducer.sendMessage(requestId, cpsAsyncRequestResponseEvent); + } + + protected void publishAsyncFailureEvent(final String topicName, + final String requestId, + final DataAccessRequest.OperationEnum operation, + final Throwable throwable) { + HttpStatus httpStatus = HttpStatus.INTERNAL_SERVER_ERROR; + + if (throwable instanceof HttpClientRequestException) { + final HttpClientRequestException httpClientRequestException = (HttpClientRequestException) throwable; + httpStatus = httpClientRequestException.getHttpStatus(); + } + + final JsonObject errorDetails = new JsonObject(); + errorDetails.addProperty("errorDetails", throwable.getMessage()); + publishAsyncEvent( + topicName, + requestId, + errorDetails.toString(), + httpStatus.getReasonPhrase(), + String.valueOf(httpStatus.value()) + ); + } +} + + + diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java new file mode 100644 index 00000000..de1fc95a --- /dev/null +++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java @@ -0,0 +1,89 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.notifications.async; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.UUID; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.dmi.Application; +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; +import org.onap.cps.ncmp.event.model.EventContent; + +/** + * Helper to create DmiAsyncRequestResponseEvent. + */ +@Slf4j +public class DmiAsyncRequestResponseEventCreator { + + private static final DateTimeFormatter dateTimeFormatter + = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Create an event. + * + * @param resourceDataAsJson the resource data as json + * @param topicParamInQuery the topic to send response to + * @param requestId the request id + * @param status the status of the request + * @param code the code of the response + * + * @return DmiAsyncRequestResponseEvent + */ + public DmiAsyncRequestResponseEvent createEvent(final String resourceDataAsJson, + final String topicParamInQuery, + final String requestId, + final String status, + final String code) { + final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent = new DmiAsyncRequestResponseEvent(); + + dmiAsyncRequestResponseEvent.setEventId(UUID.randomUUID().toString()); + dmiAsyncRequestResponseEvent.setEventCorrelationId(requestId); + dmiAsyncRequestResponseEvent.setEventType(DmiAsyncRequestResponseEvent.class.getName()); + dmiAsyncRequestResponseEvent.setEventSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1"); + dmiAsyncRequestResponseEvent.setEventSource(Application.class.getPackageName()); + dmiAsyncRequestResponseEvent.setEventTarget(topicParamInQuery); + dmiAsyncRequestResponseEvent.setEventTime(ZonedDateTime.now().format(dateTimeFormatter)); + dmiAsyncRequestResponseEvent.setEventContent(getEventContent(resourceDataAsJson, status, code)); + + return dmiAsyncRequestResponseEvent; + } + + @SneakyThrows + private EventContent getEventContent(final String resourceDataAsJson, final String status, final String code) { + final EventContent eventContent = new EventContent(); + + eventContent.setResponseDataSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1"); + eventContent.setResponseStatus(status); + eventContent.setResponseCode(code); + + eventContent.setAdditionalProperty("response-data", + objectMapper.readValue(resourceDataAsJson, HashMap.class)); + + return eventContent; + } + +} diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java index f5e1839b..00fea330 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java @@ -18,27 +18,30 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.dmi.service; +package org.onap.cps.ncmp.dmi.notifications.async; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; +import lombok.RequiredArgsConstructor; +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; -@Slf4j @Service -@AllArgsConstructor -public class NcmpKafkaPublisherService { +@RequiredArgsConstructor +public class DmiAsyncRequestResponseEventProducer { - private final NcmpKafkaPublisher ncmpKafkaPublisher; + private final KafkaTemplate<String, DmiAsyncRequestResponseEvent> kafkaTemplate; + + @Value("${app.ncmp.async.topic}") + private String dmiNcmpTopic; /** - * publish the message to NCMP. + * Sends message to the configured topic with a message key. * - * @param messageKey message key - * @param message message payload + * @param requestId the request id + * @param dmiAsyncRequestResponseEvent the event to publish */ - public void publishToNcmp(final String messageKey, final Object message) { - log.debug("Publishing message : {} to NCMP with message-key : {}", message, messageKey); - ncmpKafkaPublisher.sendMessage(messageKey, message); + public void sendMessage(final String requestId, final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent) { + kafkaTemplate.send(dmiNcmpTopic, requestId, dmiAsyncRequestResponseEvent); } } diff --git a/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java b/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java index 4dbe852d..bdd1fff6 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java @@ -37,11 +37,12 @@ import org.onap.cps.ncmp.dmi.model.ModuleReferencesRequest; import org.onap.cps.ncmp.dmi.model.ModuleResourcesReadRequest; import org.onap.cps.ncmp.dmi.model.ModuleSet; import org.onap.cps.ncmp.dmi.model.YangResources; +import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor; import org.onap.cps.ncmp.dmi.rest.api.DmiPluginApi; import org.onap.cps.ncmp.dmi.rest.api.DmiPluginInternalApi; import org.onap.cps.ncmp.dmi.service.DmiService; -import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService; import org.onap.cps.ncmp.dmi.service.model.ModuleReference; +import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RequestMapping; @@ -54,13 +55,13 @@ import org.springframework.web.bind.annotation.RestController; public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi { private final DmiService dmiService; - private final ObjectMapper objectMapper; - - private final NcmpKafkaPublisherService ncmpKafkaPublisherService; - + private final AsyncTaskExecutor asyncTaskExecutor; private static final Map<OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6); + @Value("${notification.async.executor.time-out-value-in-ms:2000}") + private int timeOutInMillis; + static { operationToHttpStatusMap.put(null, HttpStatus.OK); operationToHttpStatusMap.put(OperationEnum.READ, HttpStatus.OK); @@ -70,10 +71,9 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi { operationToHttpStatusMap.put(OperationEnum.DELETE, HttpStatus.NO_CONTENT); } - @Override public ResponseEntity<ModuleSet> getModuleReferences(final String cmHandle, - final @Valid ModuleReferencesRequest body) { + final @Valid ModuleReferencesRequest body) { // For onap-dmi-plugin we don't need cmHandleProperties, so DataAccessReadRequest is not used. final ModuleSet moduleSet = dmiService.getModulesForCmHandle(cmHandle); return ResponseEntity.ok(moduleSet); @@ -104,66 +104,119 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi { } /** - * This method fetches the resource for given cm handle using pass through operational. It filters the response on - * the basis of options query parameters and returns response. Does not support write operations. + * This method fetches the resource for given cm handle using pass through operational datastore. It filters the + * response on the basis of options query parameters and returns response. Does not support write operations. * * @param resourceIdentifier resource identifier to fetch data * @param cmHandle cm handle identifier * @param dataAccessRequest data Access Request * @param optionsParamInQuery options query parameter - * @param topicParamInQuery optional topic parameter + * @param topicParamInQuery topic name for (triggering) async responses * @return {@code ResponseEntity} response entity */ @Override public ResponseEntity<Object> dataAccessPassthroughOperational(final String resourceIdentifier, final String cmHandle, - final @Valid DataAccessRequest - dataAccessRequest, + final @Valid DataAccessRequest dataAccessRequest, final @Valid String optionsParamInQuery, final String topicParamInQuery) { if (isReadOperation(dataAccessRequest)) { - final String resourceDataAsJson = dmiService.getResourceData(cmHandle, - resourceIdentifier, - optionsParamInQuery, - DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM); + if (hasTopic(topicParamInQuery)) { + return handleAsyncRequest(resourceIdentifier, cmHandle, dataAccessRequest, optionsParamInQuery, + topicParamInQuery); + } + + final String resourceDataAsJson = dmiService.getResourceData(cmHandle, resourceIdentifier, + optionsParamInQuery, DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM); return ResponseEntity.ok(resourceDataAsJson); } return new ResponseEntity<>(HttpStatus.BAD_REQUEST); } + /** + * This method fetches the resource for given cm handle using pass through running datastore. It filters the + * response on the basis of options query parameters and returns response. It supports both read and write + * operation. + * + * @param resourceIdentifier resource identifier to fetch data + * @param cmHandle cm handle identifier + * @param dataAccessRequest data Access Request + * @param optionsParamInQuery options query parameter + * @param topicParamInQuery topic name for (triggering) async responses + * @return {@code ResponseEntity} response entity + */ @Override public ResponseEntity<Object> dataAccessPassthroughRunning(final String resourceIdentifier, final String cmHandle, - final @Valid DataAccessRequest - dataAccessRequest, + final @Valid DataAccessRequest dataAccessRequest, final @Valid String optionsParamInQuery, final String topicParamInQuery) { - final String sdncResponse; - if (isReadOperation(dataAccessRequest)) { - sdncResponse = dmiService.getResourceData(cmHandle, - resourceIdentifier, - optionsParamInQuery, - DmiService.RESTCONF_CONTENT_PASSTHROUGH_RUNNING_QUERY_PARAM); - } else { - sdncResponse = dmiService.writeData( + if (hasTopic(topicParamInQuery)) { + asyncTaskExecutor.executeAsyncTask(() -> + getSdncResponseForPassThroughRunning( + resourceIdentifier, + cmHandle, + dataAccessRequest, + optionsParamInQuery), + topicParamInQuery, + dataAccessRequest.getRequestId(), dataAccessRequest.getOperation(), - cmHandle, - resourceIdentifier, - dataAccessRequest.getDataType(), - dataAccessRequest.getData()); + timeOutInMillis + ); + return new ResponseEntity<>(HttpStatus.NO_CONTENT); } + + final String sdncResponse = + getSdncResponseForPassThroughRunning(resourceIdentifier, cmHandle, dataAccessRequest, optionsParamInQuery); return new ResponseEntity<>(sdncResponse, operationToHttpStatusMap.get(dataAccessRequest.getOperation())); } + private String getSdncResponseForPassThroughRunning(final String resourceIdentifier, + final String cmHandle, + final DataAccessRequest dataAccessRequest, + final String optionsParamInQuery) { + if (isReadOperation(dataAccessRequest)) { + return dmiService.getResourceData(cmHandle, resourceIdentifier, optionsParamInQuery, + DmiService.RESTCONF_CONTENT_PASSTHROUGH_RUNNING_QUERY_PARAM); + } + + return dmiService.writeData(dataAccessRequest.getOperation(), cmHandle, resourceIdentifier, + dataAccessRequest.getDataType(), dataAccessRequest.getData()); + } + private boolean isReadOperation(final @Valid DataAccessRequest dataAccessRequest) { return dataAccessRequest.getOperation() == null || dataAccessRequest.getOperation().equals(DataAccessRequest.OperationEnum.READ); } private List<ModuleReference> convertRestObjectToJavaApiObject( - final ModuleResourcesReadRequest moduleResourcesReadRequest) { + final ModuleResourcesReadRequest moduleResourcesReadRequest) { return objectMapper .convertValue(moduleResourcesReadRequest.getData().getModules(), - new TypeReference<List<ModuleReference>>() {}); + new TypeReference<List<ModuleReference>>() {}); + } + + private boolean hasTopic(final String topicParamInQuery) { + return !(topicParamInQuery == null || topicParamInQuery.isBlank()); } + + private ResponseEntity<Object> handleAsyncRequest(final String resourceIdentifier, + final String cmHandle, + final DataAccessRequest dataAccessRequest, + final String optionsParamInQuery, + final String topicParamInQuery) { + asyncTaskExecutor.executeAsyncTask(() -> + dmiService.getResourceData( + cmHandle, + resourceIdentifier, + optionsParamInQuery, + DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM), + topicParamInQuery, + dataAccessRequest.getRequestId(), + dataAccessRequest.getOperation(), + timeOutInMillis + ); + return new ResponseEntity<>(HttpStatus.NO_CONTENT); + } + } diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java b/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java index 22d47442..753d16f7 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java @@ -33,9 +33,9 @@ import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.dmi.config.DmiPluginConfig.DmiPluginProperties; import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException; import org.onap.cps.ncmp.dmi.exception.DmiException; +import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException; import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException; import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException; -import org.onap.cps.ncmp.dmi.exception.ResourceDataNotFound; import org.onap.cps.ncmp.dmi.model.DataAccessRequest; import org.onap.cps.ncmp.dmi.model.ModuleSet; import org.onap.cps.ncmp.dmi.model.ModuleSetSchemas; @@ -59,8 +59,6 @@ public class DmiServiceImpl implements DmiService { private NcmpRestClient ncmpRestClient; private ObjectMapper objectMapper; private DmiPluginProperties dmiPluginProperties; - private static final String RESPONSE_CODE = "response code : "; - private static final String MESSAGE = " message : "; /** * Constructor. @@ -107,8 +105,8 @@ public class DmiServiceImpl implements DmiService { "SDNC did not return a module resource for the given cmHandle."); } else { log.error("Error occurred when getting module resources from SDNC for the given cmHandle {}", cmHandle); - throw new DmiException(cmHandle, - RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody()); + throw new HttpClientRequestException( + cmHandle, responseEntity.getBody(), responseEntity.getStatusCode()); } } return yangResources; @@ -166,20 +164,14 @@ public class DmiServiceImpl implements DmiService { final String dataType, final String data) { final ResponseEntity<String> responseEntity = sdncOperations.writeData(operation, cmHandle, resourceIdentifier, dataType, data); - if (responseEntity.getStatusCode().is2xxSuccessful()) { - return responseEntity.getBody(); - } else { - throw new DmiException(cmHandle, - RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody()); - } + return prepareAndSendResponse(responseEntity, cmHandle); } private String prepareAndSendResponse(final ResponseEntity<String> responseEntity, final String cmHandle) { - if (responseEntity.getStatusCode() == HttpStatus.OK) { + if (responseEntity.getStatusCode().is2xxSuccessful()) { return responseEntity.getBody(); } else { - throw new ResourceDataNotFound(cmHandle, - RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody()); + throw new HttpClientRequestException(cmHandle, responseEntity.getBody(), responseEntity.getStatusCode()); } } diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java deleted file mode 100644 index 373a09d7..00000000 --- a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.dmi.service; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import org.springframework.stereotype.Component; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; - -@Component -@Slf4j -public class NcmpKafkaPublisher { - - private final KafkaTemplate<String, Object> kafkaTemplate; - private final String topicName; - - /** - * KafkaTemplate and Topic name. - * - * @param kafkaTemplate kafka template - * @param topicName topic name - */ - @Autowired - public NcmpKafkaPublisher(final KafkaTemplate<String, Object> kafkaTemplate, - @Value("${app.ncmp.async-m2m.topic}") final String topicName) { - this.kafkaTemplate = kafkaTemplate; - this.topicName = topicName; - } - - /** - * Sends message to the configured topic with a message key. - * - * @param messageKey message key - * @param payload message payload - */ - public void sendMessage(final String messageKey, final Object payload) { - final ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topicName, messageKey, payload); - send.addCallback(new ListenableFutureCallback<>() { - @Override - public void onFailure(final Throwable ex) { - log.warn("Failed to send the messages {}", ex.getMessage()); - } - - @Override - public void onSuccess(final SendResult<String, Object> result) { - log.debug("Sent message {}", result.getProducerRecord()); - } - }); - } -} diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java b/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java index cf7b4599..179707ab 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation + * Copyright (C) 2021-2022 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,9 +69,10 @@ public class SdncRestconfClient { * @param httpHeaders HTTP Headers * @return response entity */ - public ResponseEntity<String> httpOperationWithJsonData(final HttpMethod httpMethod, final String resourceUrl, - final String jsonData, - final HttpHeaders httpHeaders) { + public ResponseEntity<String> httpOperationWithJsonData(final HttpMethod httpMethod, + final String resourceUrl, + final String jsonData, + final HttpHeaders httpHeaders) { final String sdncBaseUrl = sdncProperties.getBaseUrl(); final String sdncRestconfUrl = sdncBaseUrl.concat(resourceUrl); httpHeaders.setBasicAuth(sdncProperties.getAuthUsername(), sdncProperties.getAuthPassword()); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8be97d2d..6ad9d58d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -45,19 +45,24 @@ spring: pathmatch: matching-strategy: ANT_PATH_MATCHER kafka: - bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER} + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER:localhost:9092} security: protocol: PLAINTEXT producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer - client-id: dmi-plugin + client-id: ncmp-dmi-plugin app: ncmp: - async-m2m: + async: topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m} +notification: + async: + executor: + time-out-value-in-ms: 2000 + # Actuator management: server: @@ -106,4 +111,4 @@ springdoc: urlsPrimaryName: query urls: - name: query - url: /api-docs/openapi.yaml
\ No newline at end of file + url: /api-docs/openapi.yaml |