summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/cps/ncmp/dmi/notifications/async
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/cps/ncmp/dmi/notifications/async')
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java122
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java89
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java47
3 files changed, 258 insertions, 0 deletions
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java
new file mode 100644
index 00000000..7189f6c9
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async;
+
+import com.google.gson.JsonObject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException;
+import org.onap.cps.ncmp.dmi.model.DataAccessRequest;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class AsyncTaskExecutor {
+
+ private final DmiAsyncRequestResponseEventProducer dmiAsyncRequestResponseEventProducer;
+
+ private static final DmiAsyncRequestResponseEventCreator dmiAsyncRequestResponseEventCreator =
+ new DmiAsyncRequestResponseEventCreator();
+
+ private static final Map<DataAccessRequest.OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6);
+
+ static {
+ operationToHttpStatusMap.put(null, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.READ, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.CREATE, HttpStatus.CREATED);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.PATCH, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.UPDATE, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.DELETE, HttpStatus.NO_CONTENT);
+ }
+
+ /**
+ * Execute task asynchronously and publish response to supplied topic.
+ *
+ * @param taskSupplier functional method is get() task need to executed asynchronously
+ * @param topicName topic name where message need to be published
+ * @param requestId unique requestId for async request
+ * @param operation the operation performed
+ * @param timeOutInMilliSeconds task timeout in milliseconds
+ */
+ public void executeAsyncTask(final Supplier<String> taskSupplier,
+ final String topicName,
+ final String requestId,
+ final DataAccessRequest.OperationEnum operation,
+ final int timeOutInMilliSeconds) {
+ CompletableFuture.supplyAsync(taskSupplier::get)
+ .orTimeout(timeOutInMilliSeconds, TimeUnit.MILLISECONDS)
+ .whenCompleteAsync((resourceDataAsJson, throwable) -> {
+ if (throwable == null) {
+ final String status = operationToHttpStatusMap.get(operation).getReasonPhrase();
+ final String code = String.valueOf(operationToHttpStatusMap.get(operation).value());
+ publishAsyncEvent(topicName, requestId, resourceDataAsJson, status, code);
+ } else {
+ log.error("Error occurred with async request {}", throwable.getMessage());
+ publishAsyncFailureEvent(topicName, requestId, operation, throwable);
+ }
+ });
+ log.info("Async task completed.");
+ }
+
+ private void publishAsyncEvent(final String topicName,
+ final String requestId,
+ final String resourceDataAsJson,
+ final String status,
+ final String code) {
+ final DmiAsyncRequestResponseEvent cpsAsyncRequestResponseEvent =
+ dmiAsyncRequestResponseEventCreator.createEvent(resourceDataAsJson, topicName, requestId, status, code);
+
+ dmiAsyncRequestResponseEventProducer.sendMessage(requestId, cpsAsyncRequestResponseEvent);
+ }
+
+ protected void publishAsyncFailureEvent(final String topicName,
+ final String requestId,
+ final DataAccessRequest.OperationEnum operation,
+ final Throwable throwable) {
+ HttpStatus httpStatus = HttpStatus.INTERNAL_SERVER_ERROR;
+
+ if (throwable instanceof HttpClientRequestException) {
+ final HttpClientRequestException httpClientRequestException = (HttpClientRequestException) throwable;
+ httpStatus = httpClientRequestException.getHttpStatus();
+ }
+
+ final JsonObject errorDetails = new JsonObject();
+ errorDetails.addProperty("errorDetails", throwable.getMessage());
+ publishAsyncEvent(
+ topicName,
+ requestId,
+ errorDetails.toString(),
+ httpStatus.getReasonPhrase(),
+ String.valueOf(httpStatus.value())
+ );
+ }
+}
+
+
+
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java
new file mode 100644
index 00000000..de1fc95a
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java
@@ -0,0 +1,89 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.UUID;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.Application;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.onap.cps.ncmp.event.model.EventContent;
+
+/**
+ * Helper to create DmiAsyncRequestResponseEvent.
+ */
+@Slf4j
+public class DmiAsyncRequestResponseEventCreator {
+
+ private static final DateTimeFormatter dateTimeFormatter
+ = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Create an event.
+ *
+ * @param resourceDataAsJson the resource data as json
+ * @param topicParamInQuery the topic to send response to
+ * @param requestId the request id
+ * @param status the status of the request
+ * @param code the code of the response
+ *
+ * @return DmiAsyncRequestResponseEvent
+ */
+ public DmiAsyncRequestResponseEvent createEvent(final String resourceDataAsJson,
+ final String topicParamInQuery,
+ final String requestId,
+ final String status,
+ final String code) {
+ final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent = new DmiAsyncRequestResponseEvent();
+
+ dmiAsyncRequestResponseEvent.setEventId(UUID.randomUUID().toString());
+ dmiAsyncRequestResponseEvent.setEventCorrelationId(requestId);
+ dmiAsyncRequestResponseEvent.setEventType(DmiAsyncRequestResponseEvent.class.getName());
+ dmiAsyncRequestResponseEvent.setEventSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1");
+ dmiAsyncRequestResponseEvent.setEventSource(Application.class.getPackageName());
+ dmiAsyncRequestResponseEvent.setEventTarget(topicParamInQuery);
+ dmiAsyncRequestResponseEvent.setEventTime(ZonedDateTime.now().format(dateTimeFormatter));
+ dmiAsyncRequestResponseEvent.setEventContent(getEventContent(resourceDataAsJson, status, code));
+
+ return dmiAsyncRequestResponseEvent;
+ }
+
+ @SneakyThrows
+ private EventContent getEventContent(final String resourceDataAsJson, final String status, final String code) {
+ final EventContent eventContent = new EventContent();
+
+ eventContent.setResponseDataSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1");
+ eventContent.setResponseStatus(status);
+ eventContent.setResponseCode(code);
+
+ eventContent.setAdditionalProperty("response-data",
+ objectMapper.readValue(resourceDataAsJson, HashMap.class));
+
+ return eventContent;
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java
new file mode 100644
index 00000000..00fea330
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java
@@ -0,0 +1,47 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async;
+
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+public class DmiAsyncRequestResponseEventProducer {
+
+ private final KafkaTemplate<String, DmiAsyncRequestResponseEvent> kafkaTemplate;
+
+ @Value("${app.ncmp.async.topic}")
+ private String dmiNcmpTopic;
+
+ /**
+ * Sends message to the configured topic with a message key.
+ *
+ * @param requestId the request id
+ * @param dmiAsyncRequestResponseEvent the event to publish
+ */
+ public void sendMessage(final String requestId, final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent) {
+ kafkaTemplate.send(dmiNcmpTopic, requestId, dmiAsyncRequestResponseEvent);
+ }
+}