summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authorJosephKeenan <joseph.keenan@est.tech>2022-05-23 15:43:05 +0100
committerJosephKeenan <joseph.keenan@est.tech>2022-06-01 16:56:50 +0100
commit2cd8b98223bd49975fcca0ec7f1d4673a4163074 (patch)
treed26e809ad7befaf8b7815271a6fcc501f67b3117 /src/main
parentdf26bc38a75f10650ce5785cdc9bd7b9516f6f25 (diff)
Async request response dmi -> NCMP
-Added Async for passthrough running and operational -Build will fail until cps is merged https://gerrit.onap.org/r/c/cps/+/128685 Issue-ID: CPS-830 Change-Id: Iedbfab109f5cd777a5be8eed7414758d0f5ec05c Signed-off-by: JosephKeenan <joseph.keenan@est.tech> Signed-off-by: ToineSiebelink <toine.siebelink@est.tech> Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java (renamed from src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java)18
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java122
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java89
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java (renamed from src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java)29
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java117
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java20
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java72
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java9
-rw-r--r--src/main/resources/application.yml13
9 files changed, 344 insertions, 145 deletions
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java b/src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java
index dbef3477..b4b0249f 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2021-2022 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,19 +20,25 @@
package org.onap.cps.ncmp.dmi.exception;
-public class ResourceDataNotFound extends DmiException {
+import lombok.Getter;
+import org.springframework.http.HttpStatus;
+
+@Getter
+public class HttpClientRequestException extends DmiException {
private static final long serialVersionUID = 881438585188332404L;
- private static final String ERROR_MESSAGE = "Resource data not found for the given cmHandles: ";
+ private final HttpStatus httpStatus;
/**
* Constructor.
*
* @param cmHandle cmHandle identifier
- * @param details the error details
+ * @param details response body from the client available as details
+ * @param httpStatus http status from the client
*/
- public ResourceDataNotFound(final String cmHandle, final String details) {
- super(ERROR_MESSAGE + cmHandle, details);
+ public HttpClientRequestException(final String cmHandle, final String details, final HttpStatus httpStatus) {
+ super("Resource data request failed for CM Handle: " + cmHandle, details);
+ this.httpStatus = httpStatus;
}
}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java
new file mode 100644
index 00000000..7189f6c9
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async;
+
+import com.google.gson.JsonObject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException;
+import org.onap.cps.ncmp.dmi.model.DataAccessRequest;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class AsyncTaskExecutor {
+
+ private final DmiAsyncRequestResponseEventProducer dmiAsyncRequestResponseEventProducer;
+
+ private static final DmiAsyncRequestResponseEventCreator dmiAsyncRequestResponseEventCreator =
+ new DmiAsyncRequestResponseEventCreator();
+
+ private static final Map<DataAccessRequest.OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6);
+
+ static {
+ operationToHttpStatusMap.put(null, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.READ, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.CREATE, HttpStatus.CREATED);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.PATCH, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.UPDATE, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.DELETE, HttpStatus.NO_CONTENT);
+ }
+
+ /**
+ * Execute task asynchronously and publish response to supplied topic.
+ *
+ * @param taskSupplier functional method is get() task need to executed asynchronously
+ * @param topicName topic name where message need to be published
+ * @param requestId unique requestId for async request
+ * @param operation the operation performed
+ * @param timeOutInMilliSeconds task timeout in milliseconds
+ */
+ public void executeAsyncTask(final Supplier<String> taskSupplier,
+ final String topicName,
+ final String requestId,
+ final DataAccessRequest.OperationEnum operation,
+ final int timeOutInMilliSeconds) {
+ CompletableFuture.supplyAsync(taskSupplier::get)
+ .orTimeout(timeOutInMilliSeconds, TimeUnit.MILLISECONDS)
+ .whenCompleteAsync((resourceDataAsJson, throwable) -> {
+ if (throwable == null) {
+ final String status = operationToHttpStatusMap.get(operation).getReasonPhrase();
+ final String code = String.valueOf(operationToHttpStatusMap.get(operation).value());
+ publishAsyncEvent(topicName, requestId, resourceDataAsJson, status, code);
+ } else {
+ log.error("Error occurred with async request {}", throwable.getMessage());
+ publishAsyncFailureEvent(topicName, requestId, operation, throwable);
+ }
+ });
+ log.info("Async task completed.");
+ }
+
+ private void publishAsyncEvent(final String topicName,
+ final String requestId,
+ final String resourceDataAsJson,
+ final String status,
+ final String code) {
+ final DmiAsyncRequestResponseEvent cpsAsyncRequestResponseEvent =
+ dmiAsyncRequestResponseEventCreator.createEvent(resourceDataAsJson, topicName, requestId, status, code);
+
+ dmiAsyncRequestResponseEventProducer.sendMessage(requestId, cpsAsyncRequestResponseEvent);
+ }
+
+ protected void publishAsyncFailureEvent(final String topicName,
+ final String requestId,
+ final DataAccessRequest.OperationEnum operation,
+ final Throwable throwable) {
+ HttpStatus httpStatus = HttpStatus.INTERNAL_SERVER_ERROR;
+
+ if (throwable instanceof HttpClientRequestException) {
+ final HttpClientRequestException httpClientRequestException = (HttpClientRequestException) throwable;
+ httpStatus = httpClientRequestException.getHttpStatus();
+ }
+
+ final JsonObject errorDetails = new JsonObject();
+ errorDetails.addProperty("errorDetails", throwable.getMessage());
+ publishAsyncEvent(
+ topicName,
+ requestId,
+ errorDetails.toString(),
+ httpStatus.getReasonPhrase(),
+ String.valueOf(httpStatus.value())
+ );
+ }
+}
+
+
+
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java
new file mode 100644
index 00000000..de1fc95a
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java
@@ -0,0 +1,89 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.UUID;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.Application;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.onap.cps.ncmp.event.model.EventContent;
+
+/**
+ * Helper to create DmiAsyncRequestResponseEvent.
+ */
+@Slf4j
+public class DmiAsyncRequestResponseEventCreator {
+
+ private static final DateTimeFormatter dateTimeFormatter
+ = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Create an event.
+ *
+ * @param resourceDataAsJson the resource data as json
+ * @param topicParamInQuery the topic to send response to
+ * @param requestId the request id
+ * @param status the status of the request
+ * @param code the code of the response
+ *
+ * @return DmiAsyncRequestResponseEvent
+ */
+ public DmiAsyncRequestResponseEvent createEvent(final String resourceDataAsJson,
+ final String topicParamInQuery,
+ final String requestId,
+ final String status,
+ final String code) {
+ final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent = new DmiAsyncRequestResponseEvent();
+
+ dmiAsyncRequestResponseEvent.setEventId(UUID.randomUUID().toString());
+ dmiAsyncRequestResponseEvent.setEventCorrelationId(requestId);
+ dmiAsyncRequestResponseEvent.setEventType(DmiAsyncRequestResponseEvent.class.getName());
+ dmiAsyncRequestResponseEvent.setEventSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1");
+ dmiAsyncRequestResponseEvent.setEventSource(Application.class.getPackageName());
+ dmiAsyncRequestResponseEvent.setEventTarget(topicParamInQuery);
+ dmiAsyncRequestResponseEvent.setEventTime(ZonedDateTime.now().format(dateTimeFormatter));
+ dmiAsyncRequestResponseEvent.setEventContent(getEventContent(resourceDataAsJson, status, code));
+
+ return dmiAsyncRequestResponseEvent;
+ }
+
+ @SneakyThrows
+ private EventContent getEventContent(final String resourceDataAsJson, final String status, final String code) {
+ final EventContent eventContent = new EventContent();
+
+ eventContent.setResponseDataSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1");
+ eventContent.setResponseStatus(status);
+ eventContent.setResponseCode(code);
+
+ eventContent.setAdditionalProperty("response-data",
+ objectMapper.readValue(resourceDataAsJson, HashMap.class));
+
+ return eventContent;
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java
index f5e1839b..00fea330 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java
@@ -18,27 +18,30 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.dmi.service;
+package org.onap.cps.ncmp.dmi.notifications.async;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
-@Slf4j
@Service
-@AllArgsConstructor
-public class NcmpKafkaPublisherService {
+@RequiredArgsConstructor
+public class DmiAsyncRequestResponseEventProducer {
- private final NcmpKafkaPublisher ncmpKafkaPublisher;
+ private final KafkaTemplate<String, DmiAsyncRequestResponseEvent> kafkaTemplate;
+
+ @Value("${app.ncmp.async.topic}")
+ private String dmiNcmpTopic;
/**
- * publish the message to NCMP.
+ * Sends message to the configured topic with a message key.
*
- * @param messageKey message key
- * @param message message payload
+ * @param requestId the request id
+ * @param dmiAsyncRequestResponseEvent the event to publish
*/
- public void publishToNcmp(final String messageKey, final Object message) {
- log.debug("Publishing message : {} to NCMP with message-key : {}", message, messageKey);
- ncmpKafkaPublisher.sendMessage(messageKey, message);
+ public void sendMessage(final String requestId, final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent) {
+ kafkaTemplate.send(dmiNcmpTopic, requestId, dmiAsyncRequestResponseEvent);
}
}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java b/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
index 4dbe852d..bdd1fff6 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
@@ -37,11 +37,12 @@ import org.onap.cps.ncmp.dmi.model.ModuleReferencesRequest;
import org.onap.cps.ncmp.dmi.model.ModuleResourcesReadRequest;
import org.onap.cps.ncmp.dmi.model.ModuleSet;
import org.onap.cps.ncmp.dmi.model.YangResources;
+import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor;
import org.onap.cps.ncmp.dmi.rest.api.DmiPluginApi;
import org.onap.cps.ncmp.dmi.rest.api.DmiPluginInternalApi;
import org.onap.cps.ncmp.dmi.service.DmiService;
-import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService;
import org.onap.cps.ncmp.dmi.service.model.ModuleReference;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -54,13 +55,13 @@ import org.springframework.web.bind.annotation.RestController;
public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
private final DmiService dmiService;
-
private final ObjectMapper objectMapper;
-
- private final NcmpKafkaPublisherService ncmpKafkaPublisherService;
-
+ private final AsyncTaskExecutor asyncTaskExecutor;
private static final Map<OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6);
+ @Value("${notification.async.executor.time-out-value-in-ms:2000}")
+ private int timeOutInMillis;
+
static {
operationToHttpStatusMap.put(null, HttpStatus.OK);
operationToHttpStatusMap.put(OperationEnum.READ, HttpStatus.OK);
@@ -70,10 +71,9 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
operationToHttpStatusMap.put(OperationEnum.DELETE, HttpStatus.NO_CONTENT);
}
-
@Override
public ResponseEntity<ModuleSet> getModuleReferences(final String cmHandle,
- final @Valid ModuleReferencesRequest body) {
+ final @Valid ModuleReferencesRequest body) {
// For onap-dmi-plugin we don't need cmHandleProperties, so DataAccessReadRequest is not used.
final ModuleSet moduleSet = dmiService.getModulesForCmHandle(cmHandle);
return ResponseEntity.ok(moduleSet);
@@ -104,66 +104,119 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
}
/**
- * This method fetches the resource for given cm handle using pass through operational. It filters the response on
- * the basis of options query parameters and returns response. Does not support write operations.
+ * This method fetches the resource for given cm handle using pass through operational datastore. It filters the
+ * response on the basis of options query parameters and returns response. Does not support write operations.
*
* @param resourceIdentifier resource identifier to fetch data
* @param cmHandle cm handle identifier
* @param dataAccessRequest data Access Request
* @param optionsParamInQuery options query parameter
- * @param topicParamInQuery optional topic parameter
+ * @param topicParamInQuery topic name for (triggering) async responses
* @return {@code ResponseEntity} response entity
*/
@Override
public ResponseEntity<Object> dataAccessPassthroughOperational(final String resourceIdentifier,
final String cmHandle,
- final @Valid DataAccessRequest
- dataAccessRequest,
+ final @Valid DataAccessRequest dataAccessRequest,
final @Valid String optionsParamInQuery,
final String topicParamInQuery) {
if (isReadOperation(dataAccessRequest)) {
- final String resourceDataAsJson = dmiService.getResourceData(cmHandle,
- resourceIdentifier,
- optionsParamInQuery,
- DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM);
+ if (hasTopic(topicParamInQuery)) {
+ return handleAsyncRequest(resourceIdentifier, cmHandle, dataAccessRequest, optionsParamInQuery,
+ topicParamInQuery);
+ }
+
+ final String resourceDataAsJson = dmiService.getResourceData(cmHandle, resourceIdentifier,
+ optionsParamInQuery, DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM);
return ResponseEntity.ok(resourceDataAsJson);
}
return new ResponseEntity<>(HttpStatus.BAD_REQUEST);
}
+ /**
+ * This method fetches the resource for given cm handle using pass through running datastore. It filters the
+ * response on the basis of options query parameters and returns response. It supports both read and write
+ * operation.
+ *
+ * @param resourceIdentifier resource identifier to fetch data
+ * @param cmHandle cm handle identifier
+ * @param dataAccessRequest data Access Request
+ * @param optionsParamInQuery options query parameter
+ * @param topicParamInQuery topic name for (triggering) async responses
+ * @return {@code ResponseEntity} response entity
+ */
@Override
public ResponseEntity<Object> dataAccessPassthroughRunning(final String resourceIdentifier,
final String cmHandle,
- final @Valid DataAccessRequest
- dataAccessRequest,
+ final @Valid DataAccessRequest dataAccessRequest,
final @Valid String optionsParamInQuery,
final String topicParamInQuery) {
- final String sdncResponse;
- if (isReadOperation(dataAccessRequest)) {
- sdncResponse = dmiService.getResourceData(cmHandle,
- resourceIdentifier,
- optionsParamInQuery,
- DmiService.RESTCONF_CONTENT_PASSTHROUGH_RUNNING_QUERY_PARAM);
- } else {
- sdncResponse = dmiService.writeData(
+ if (hasTopic(topicParamInQuery)) {
+ asyncTaskExecutor.executeAsyncTask(() ->
+ getSdncResponseForPassThroughRunning(
+ resourceIdentifier,
+ cmHandle,
+ dataAccessRequest,
+ optionsParamInQuery),
+ topicParamInQuery,
+ dataAccessRequest.getRequestId(),
dataAccessRequest.getOperation(),
- cmHandle,
- resourceIdentifier,
- dataAccessRequest.getDataType(),
- dataAccessRequest.getData());
+ timeOutInMillis
+ );
+ return new ResponseEntity<>(HttpStatus.NO_CONTENT);
}
+
+ final String sdncResponse =
+ getSdncResponseForPassThroughRunning(resourceIdentifier, cmHandle, dataAccessRequest, optionsParamInQuery);
return new ResponseEntity<>(sdncResponse, operationToHttpStatusMap.get(dataAccessRequest.getOperation()));
}
+ private String getSdncResponseForPassThroughRunning(final String resourceIdentifier,
+ final String cmHandle,
+ final DataAccessRequest dataAccessRequest,
+ final String optionsParamInQuery) {
+ if (isReadOperation(dataAccessRequest)) {
+ return dmiService.getResourceData(cmHandle, resourceIdentifier, optionsParamInQuery,
+ DmiService.RESTCONF_CONTENT_PASSTHROUGH_RUNNING_QUERY_PARAM);
+ }
+
+ return dmiService.writeData(dataAccessRequest.getOperation(), cmHandle, resourceIdentifier,
+ dataAccessRequest.getDataType(), dataAccessRequest.getData());
+ }
+
private boolean isReadOperation(final @Valid DataAccessRequest dataAccessRequest) {
return dataAccessRequest.getOperation() == null
|| dataAccessRequest.getOperation().equals(DataAccessRequest.OperationEnum.READ);
}
private List<ModuleReference> convertRestObjectToJavaApiObject(
- final ModuleResourcesReadRequest moduleResourcesReadRequest) {
+ final ModuleResourcesReadRequest moduleResourcesReadRequest) {
return objectMapper
.convertValue(moduleResourcesReadRequest.getData().getModules(),
- new TypeReference<List<ModuleReference>>() {});
+ new TypeReference<List<ModuleReference>>() {});
+ }
+
+ private boolean hasTopic(final String topicParamInQuery) {
+ return !(topicParamInQuery == null || topicParamInQuery.isBlank());
}
+
+ private ResponseEntity<Object> handleAsyncRequest(final String resourceIdentifier,
+ final String cmHandle,
+ final DataAccessRequest dataAccessRequest,
+ final String optionsParamInQuery,
+ final String topicParamInQuery) {
+ asyncTaskExecutor.executeAsyncTask(() ->
+ dmiService.getResourceData(
+ cmHandle,
+ resourceIdentifier,
+ optionsParamInQuery,
+ DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM),
+ topicParamInQuery,
+ dataAccessRequest.getRequestId(),
+ dataAccessRequest.getOperation(),
+ timeOutInMillis
+ );
+ return new ResponseEntity<>(HttpStatus.NO_CONTENT);
+ }
+
}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java b/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java
index 22d47442..753d16f7 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java
@@ -33,9 +33,9 @@ import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.dmi.config.DmiPluginConfig.DmiPluginProperties;
import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException;
import org.onap.cps.ncmp.dmi.exception.DmiException;
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException;
import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException;
import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException;
-import org.onap.cps.ncmp.dmi.exception.ResourceDataNotFound;
import org.onap.cps.ncmp.dmi.model.DataAccessRequest;
import org.onap.cps.ncmp.dmi.model.ModuleSet;
import org.onap.cps.ncmp.dmi.model.ModuleSetSchemas;
@@ -59,8 +59,6 @@ public class DmiServiceImpl implements DmiService {
private NcmpRestClient ncmpRestClient;
private ObjectMapper objectMapper;
private DmiPluginProperties dmiPluginProperties;
- private static final String RESPONSE_CODE = "response code : ";
- private static final String MESSAGE = " message : ";
/**
* Constructor.
@@ -107,8 +105,8 @@ public class DmiServiceImpl implements DmiService {
"SDNC did not return a module resource for the given cmHandle.");
} else {
log.error("Error occurred when getting module resources from SDNC for the given cmHandle {}", cmHandle);
- throw new DmiException(cmHandle,
- RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody());
+ throw new HttpClientRequestException(
+ cmHandle, responseEntity.getBody(), responseEntity.getStatusCode());
}
}
return yangResources;
@@ -166,20 +164,14 @@ public class DmiServiceImpl implements DmiService {
final String dataType, final String data) {
final ResponseEntity<String> responseEntity =
sdncOperations.writeData(operation, cmHandle, resourceIdentifier, dataType, data);
- if (responseEntity.getStatusCode().is2xxSuccessful()) {
- return responseEntity.getBody();
- } else {
- throw new DmiException(cmHandle,
- RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody());
- }
+ return prepareAndSendResponse(responseEntity, cmHandle);
}
private String prepareAndSendResponse(final ResponseEntity<String> responseEntity, final String cmHandle) {
- if (responseEntity.getStatusCode() == HttpStatus.OK) {
+ if (responseEntity.getStatusCode().is2xxSuccessful()) {
return responseEntity.getBody();
} else {
- throw new ResourceDataNotFound(cmHandle,
- RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody());
+ throw new HttpClientRequestException(cmHandle, responseEntity.getBody(), responseEntity.getStatusCode());
}
}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java
deleted file mode 100644
index 373a09d7..00000000
--- a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.service;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.stereotype.Component;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
-
-@Component
-@Slf4j
-public class NcmpKafkaPublisher {
-
- private final KafkaTemplate<String, Object> kafkaTemplate;
- private final String topicName;
-
- /**
- * KafkaTemplate and Topic name.
- *
- * @param kafkaTemplate kafka template
- * @param topicName topic name
- */
- @Autowired
- public NcmpKafkaPublisher(final KafkaTemplate<String, Object> kafkaTemplate,
- @Value("${app.ncmp.async-m2m.topic}") final String topicName) {
- this.kafkaTemplate = kafkaTemplate;
- this.topicName = topicName;
- }
-
- /**
- * Sends message to the configured topic with a message key.
- *
- * @param messageKey message key
- * @param payload message payload
- */
- public void sendMessage(final String messageKey, final Object payload) {
- final ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topicName, messageKey, payload);
- send.addCallback(new ListenableFutureCallback<>() {
- @Override
- public void onFailure(final Throwable ex) {
- log.warn("Failed to send the messages {}", ex.getMessage());
- }
-
- @Override
- public void onSuccess(final SendResult<String, Object> result) {
- log.debug("Sent message {}", result.getProducerRecord());
- }
- });
- }
-}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java b/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java
index cf7b4599..179707ab 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2021-2022 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -69,9 +69,10 @@ public class SdncRestconfClient {
* @param httpHeaders HTTP Headers
* @return response entity
*/
- public ResponseEntity<String> httpOperationWithJsonData(final HttpMethod httpMethod, final String resourceUrl,
- final String jsonData,
- final HttpHeaders httpHeaders) {
+ public ResponseEntity<String> httpOperationWithJsonData(final HttpMethod httpMethod,
+ final String resourceUrl,
+ final String jsonData,
+ final HttpHeaders httpHeaders) {
final String sdncBaseUrl = sdncProperties.getBaseUrl();
final String sdncRestconfUrl = sdncBaseUrl.concat(resourceUrl);
httpHeaders.setBasicAuth(sdncProperties.getAuthUsername(), sdncProperties.getAuthPassword());
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 8be97d2d..6ad9d58d 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -45,19 +45,24 @@ spring:
pathmatch:
matching-strategy: ANT_PATH_MATCHER
kafka:
- bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
+ bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER:localhost:9092}
security:
protocol: PLAINTEXT
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
- client-id: dmi-plugin
+ client-id: ncmp-dmi-plugin
app:
ncmp:
- async-m2m:
+ async:
topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
+notification:
+ async:
+ executor:
+ time-out-value-in-ms: 2000
+
# Actuator
management:
server:
@@ -106,4 +111,4 @@ springdoc:
urlsPrimaryName: query
urls:
- name: query
- url: /api-docs/openapi.yaml \ No newline at end of file
+ url: /api-docs/openapi.yaml