summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java (renamed from src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java)18
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java122
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java89
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java (renamed from src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java)29
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java117
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java20
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java72
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java9
-rw-r--r--src/main/resources/application.yml13
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy136
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy28
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy4
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy41
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy107
-rw-r--r--src/test/resources/application.yml11
15 files changed, 514 insertions, 302 deletions
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java b/src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java
index dbef3477..b4b0249f 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/exception/ResourceDataNotFound.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/exception/HttpClientRequestException.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2021-2022 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,19 +20,25 @@
package org.onap.cps.ncmp.dmi.exception;
-public class ResourceDataNotFound extends DmiException {
+import lombok.Getter;
+import org.springframework.http.HttpStatus;
+
+@Getter
+public class HttpClientRequestException extends DmiException {
private static final long serialVersionUID = 881438585188332404L;
- private static final String ERROR_MESSAGE = "Resource data not found for the given cmHandles: ";
+ private final HttpStatus httpStatus;
/**
* Constructor.
*
* @param cmHandle cmHandle identifier
- * @param details the error details
+ * @param details response body from the client available as details
+ * @param httpStatus http status from the client
*/
- public ResourceDataNotFound(final String cmHandle, final String details) {
- super(ERROR_MESSAGE + cmHandle, details);
+ public HttpClientRequestException(final String cmHandle, final String details, final HttpStatus httpStatus) {
+ super("Resource data request failed for CM Handle: " + cmHandle, details);
+ this.httpStatus = httpStatus;
}
}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java
new file mode 100644
index 00000000..7189f6c9
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutor.java
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async;
+
+import com.google.gson.JsonObject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException;
+import org.onap.cps.ncmp.dmi.model.DataAccessRequest;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class AsyncTaskExecutor {
+
+ private final DmiAsyncRequestResponseEventProducer dmiAsyncRequestResponseEventProducer;
+
+ private static final DmiAsyncRequestResponseEventCreator dmiAsyncRequestResponseEventCreator =
+ new DmiAsyncRequestResponseEventCreator();
+
+ private static final Map<DataAccessRequest.OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6);
+
+ static {
+ operationToHttpStatusMap.put(null, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.READ, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.CREATE, HttpStatus.CREATED);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.PATCH, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.UPDATE, HttpStatus.OK);
+ operationToHttpStatusMap.put(DataAccessRequest.OperationEnum.DELETE, HttpStatus.NO_CONTENT);
+ }
+
+ /**
+ * Execute task asynchronously and publish response to supplied topic.
+ *
+ * @param taskSupplier functional method is get() task need to executed asynchronously
+ * @param topicName topic name where message need to be published
+ * @param requestId unique requestId for async request
+ * @param operation the operation performed
+ * @param timeOutInMilliSeconds task timeout in milliseconds
+ */
+ public void executeAsyncTask(final Supplier<String> taskSupplier,
+ final String topicName,
+ final String requestId,
+ final DataAccessRequest.OperationEnum operation,
+ final int timeOutInMilliSeconds) {
+ CompletableFuture.supplyAsync(taskSupplier::get)
+ .orTimeout(timeOutInMilliSeconds, TimeUnit.MILLISECONDS)
+ .whenCompleteAsync((resourceDataAsJson, throwable) -> {
+ if (throwable == null) {
+ final String status = operationToHttpStatusMap.get(operation).getReasonPhrase();
+ final String code = String.valueOf(operationToHttpStatusMap.get(operation).value());
+ publishAsyncEvent(topicName, requestId, resourceDataAsJson, status, code);
+ } else {
+ log.error("Error occurred with async request {}", throwable.getMessage());
+ publishAsyncFailureEvent(topicName, requestId, operation, throwable);
+ }
+ });
+ log.info("Async task completed.");
+ }
+
+ private void publishAsyncEvent(final String topicName,
+ final String requestId,
+ final String resourceDataAsJson,
+ final String status,
+ final String code) {
+ final DmiAsyncRequestResponseEvent cpsAsyncRequestResponseEvent =
+ dmiAsyncRequestResponseEventCreator.createEvent(resourceDataAsJson, topicName, requestId, status, code);
+
+ dmiAsyncRequestResponseEventProducer.sendMessage(requestId, cpsAsyncRequestResponseEvent);
+ }
+
+ protected void publishAsyncFailureEvent(final String topicName,
+ final String requestId,
+ final DataAccessRequest.OperationEnum operation,
+ final Throwable throwable) {
+ HttpStatus httpStatus = HttpStatus.INTERNAL_SERVER_ERROR;
+
+ if (throwable instanceof HttpClientRequestException) {
+ final HttpClientRequestException httpClientRequestException = (HttpClientRequestException) throwable;
+ httpStatus = httpClientRequestException.getHttpStatus();
+ }
+
+ final JsonObject errorDetails = new JsonObject();
+ errorDetails.addProperty("errorDetails", throwable.getMessage());
+ publishAsyncEvent(
+ topicName,
+ requestId,
+ errorDetails.toString(),
+ httpStatus.getReasonPhrase(),
+ String.valueOf(httpStatus.value())
+ );
+ }
+}
+
+
+
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java
new file mode 100644
index 00000000..de1fc95a
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventCreator.java
@@ -0,0 +1,89 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.UUID;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.Application;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.onap.cps.ncmp.event.model.EventContent;
+
+/**
+ * Helper to create DmiAsyncRequestResponseEvent.
+ */
+@Slf4j
+public class DmiAsyncRequestResponseEventCreator {
+
+ private static final DateTimeFormatter dateTimeFormatter
+ = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Create an event.
+ *
+ * @param resourceDataAsJson the resource data as json
+ * @param topicParamInQuery the topic to send response to
+ * @param requestId the request id
+ * @param status the status of the request
+ * @param code the code of the response
+ *
+ * @return DmiAsyncRequestResponseEvent
+ */
+ public DmiAsyncRequestResponseEvent createEvent(final String resourceDataAsJson,
+ final String topicParamInQuery,
+ final String requestId,
+ final String status,
+ final String code) {
+ final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent = new DmiAsyncRequestResponseEvent();
+
+ dmiAsyncRequestResponseEvent.setEventId(UUID.randomUUID().toString());
+ dmiAsyncRequestResponseEvent.setEventCorrelationId(requestId);
+ dmiAsyncRequestResponseEvent.setEventType(DmiAsyncRequestResponseEvent.class.getName());
+ dmiAsyncRequestResponseEvent.setEventSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1");
+ dmiAsyncRequestResponseEvent.setEventSource(Application.class.getPackageName());
+ dmiAsyncRequestResponseEvent.setEventTarget(topicParamInQuery);
+ dmiAsyncRequestResponseEvent.setEventTime(ZonedDateTime.now().format(dateTimeFormatter));
+ dmiAsyncRequestResponseEvent.setEventContent(getEventContent(resourceDataAsJson, status, code));
+
+ return dmiAsyncRequestResponseEvent;
+ }
+
+ @SneakyThrows
+ private EventContent getEventContent(final String resourceDataAsJson, final String status, final String code) {
+ final EventContent eventContent = new EventContent();
+
+ eventContent.setResponseDataSchema("urn:cps:" + DmiAsyncRequestResponseEvent.class.getName() + ":v1");
+ eventContent.setResponseStatus(status);
+ eventContent.setResponseCode(code);
+
+ eventContent.setAdditionalProperty("response-data",
+ objectMapper.readValue(resourceDataAsJson, HashMap.class));
+
+ return eventContent;
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java
index f5e1839b..00fea330 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/async/DmiAsyncRequestResponseEventProducer.java
@@ -18,27 +18,30 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.cps.ncmp.dmi.service;
+package org.onap.cps.ncmp.dmi.notifications.async;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
+import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
-@Slf4j
@Service
-@AllArgsConstructor
-public class NcmpKafkaPublisherService {
+@RequiredArgsConstructor
+public class DmiAsyncRequestResponseEventProducer {
- private final NcmpKafkaPublisher ncmpKafkaPublisher;
+ private final KafkaTemplate<String, DmiAsyncRequestResponseEvent> kafkaTemplate;
+
+ @Value("${app.ncmp.async.topic}")
+ private String dmiNcmpTopic;
/**
- * publish the message to NCMP.
+ * Sends message to the configured topic with a message key.
*
- * @param messageKey message key
- * @param message message payload
+ * @param requestId the request id
+ * @param dmiAsyncRequestResponseEvent the event to publish
*/
- public void publishToNcmp(final String messageKey, final Object message) {
- log.debug("Publishing message : {} to NCMP with message-key : {}", message, messageKey);
- ncmpKafkaPublisher.sendMessage(messageKey, message);
+ public void sendMessage(final String requestId, final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent) {
+ kafkaTemplate.send(dmiNcmpTopic, requestId, dmiAsyncRequestResponseEvent);
}
}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java b/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
index 4dbe852d..bdd1fff6 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
@@ -37,11 +37,12 @@ import org.onap.cps.ncmp.dmi.model.ModuleReferencesRequest;
import org.onap.cps.ncmp.dmi.model.ModuleResourcesReadRequest;
import org.onap.cps.ncmp.dmi.model.ModuleSet;
import org.onap.cps.ncmp.dmi.model.YangResources;
+import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor;
import org.onap.cps.ncmp.dmi.rest.api.DmiPluginApi;
import org.onap.cps.ncmp.dmi.rest.api.DmiPluginInternalApi;
import org.onap.cps.ncmp.dmi.service.DmiService;
-import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService;
import org.onap.cps.ncmp.dmi.service.model.ModuleReference;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -54,13 +55,13 @@ import org.springframework.web.bind.annotation.RestController;
public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
private final DmiService dmiService;
-
private final ObjectMapper objectMapper;
-
- private final NcmpKafkaPublisherService ncmpKafkaPublisherService;
-
+ private final AsyncTaskExecutor asyncTaskExecutor;
private static final Map<OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6);
+ @Value("${notification.async.executor.time-out-value-in-ms:2000}")
+ private int timeOutInMillis;
+
static {
operationToHttpStatusMap.put(null, HttpStatus.OK);
operationToHttpStatusMap.put(OperationEnum.READ, HttpStatus.OK);
@@ -70,10 +71,9 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
operationToHttpStatusMap.put(OperationEnum.DELETE, HttpStatus.NO_CONTENT);
}
-
@Override
public ResponseEntity<ModuleSet> getModuleReferences(final String cmHandle,
- final @Valid ModuleReferencesRequest body) {
+ final @Valid ModuleReferencesRequest body) {
// For onap-dmi-plugin we don't need cmHandleProperties, so DataAccessReadRequest is not used.
final ModuleSet moduleSet = dmiService.getModulesForCmHandle(cmHandle);
return ResponseEntity.ok(moduleSet);
@@ -104,66 +104,119 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
}
/**
- * This method fetches the resource for given cm handle using pass through operational. It filters the response on
- * the basis of options query parameters and returns response. Does not support write operations.
+ * This method fetches the resource for given cm handle using pass through operational datastore. It filters the
+ * response on the basis of options query parameters and returns response. Does not support write operations.
*
* @param resourceIdentifier resource identifier to fetch data
* @param cmHandle cm handle identifier
* @param dataAccessRequest data Access Request
* @param optionsParamInQuery options query parameter
- * @param topicParamInQuery optional topic parameter
+ * @param topicParamInQuery topic name for (triggering) async responses
* @return {@code ResponseEntity} response entity
*/
@Override
public ResponseEntity<Object> dataAccessPassthroughOperational(final String resourceIdentifier,
final String cmHandle,
- final @Valid DataAccessRequest
- dataAccessRequest,
+ final @Valid DataAccessRequest dataAccessRequest,
final @Valid String optionsParamInQuery,
final String topicParamInQuery) {
if (isReadOperation(dataAccessRequest)) {
- final String resourceDataAsJson = dmiService.getResourceData(cmHandle,
- resourceIdentifier,
- optionsParamInQuery,
- DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM);
+ if (hasTopic(topicParamInQuery)) {
+ return handleAsyncRequest(resourceIdentifier, cmHandle, dataAccessRequest, optionsParamInQuery,
+ topicParamInQuery);
+ }
+
+ final String resourceDataAsJson = dmiService.getResourceData(cmHandle, resourceIdentifier,
+ optionsParamInQuery, DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM);
return ResponseEntity.ok(resourceDataAsJson);
}
return new ResponseEntity<>(HttpStatus.BAD_REQUEST);
}
+ /**
+ * This method fetches the resource for given cm handle using pass through running datastore. It filters the
+ * response on the basis of options query parameters and returns response. It supports both read and write
+ * operation.
+ *
+ * @param resourceIdentifier resource identifier to fetch data
+ * @param cmHandle cm handle identifier
+ * @param dataAccessRequest data Access Request
+ * @param optionsParamInQuery options query parameter
+ * @param topicParamInQuery topic name for (triggering) async responses
+ * @return {@code ResponseEntity} response entity
+ */
@Override
public ResponseEntity<Object> dataAccessPassthroughRunning(final String resourceIdentifier,
final String cmHandle,
- final @Valid DataAccessRequest
- dataAccessRequest,
+ final @Valid DataAccessRequest dataAccessRequest,
final @Valid String optionsParamInQuery,
final String topicParamInQuery) {
- final String sdncResponse;
- if (isReadOperation(dataAccessRequest)) {
- sdncResponse = dmiService.getResourceData(cmHandle,
- resourceIdentifier,
- optionsParamInQuery,
- DmiService.RESTCONF_CONTENT_PASSTHROUGH_RUNNING_QUERY_PARAM);
- } else {
- sdncResponse = dmiService.writeData(
+ if (hasTopic(topicParamInQuery)) {
+ asyncTaskExecutor.executeAsyncTask(() ->
+ getSdncResponseForPassThroughRunning(
+ resourceIdentifier,
+ cmHandle,
+ dataAccessRequest,
+ optionsParamInQuery),
+ topicParamInQuery,
+ dataAccessRequest.getRequestId(),
dataAccessRequest.getOperation(),
- cmHandle,
- resourceIdentifier,
- dataAccessRequest.getDataType(),
- dataAccessRequest.getData());
+ timeOutInMillis
+ );
+ return new ResponseEntity<>(HttpStatus.NO_CONTENT);
}
+
+ final String sdncResponse =
+ getSdncResponseForPassThroughRunning(resourceIdentifier, cmHandle, dataAccessRequest, optionsParamInQuery);
return new ResponseEntity<>(sdncResponse, operationToHttpStatusMap.get(dataAccessRequest.getOperation()));
}
+ private String getSdncResponseForPassThroughRunning(final String resourceIdentifier,
+ final String cmHandle,
+ final DataAccessRequest dataAccessRequest,
+ final String optionsParamInQuery) {
+ if (isReadOperation(dataAccessRequest)) {
+ return dmiService.getResourceData(cmHandle, resourceIdentifier, optionsParamInQuery,
+ DmiService.RESTCONF_CONTENT_PASSTHROUGH_RUNNING_QUERY_PARAM);
+ }
+
+ return dmiService.writeData(dataAccessRequest.getOperation(), cmHandle, resourceIdentifier,
+ dataAccessRequest.getDataType(), dataAccessRequest.getData());
+ }
+
private boolean isReadOperation(final @Valid DataAccessRequest dataAccessRequest) {
return dataAccessRequest.getOperation() == null
|| dataAccessRequest.getOperation().equals(DataAccessRequest.OperationEnum.READ);
}
private List<ModuleReference> convertRestObjectToJavaApiObject(
- final ModuleResourcesReadRequest moduleResourcesReadRequest) {
+ final ModuleResourcesReadRequest moduleResourcesReadRequest) {
return objectMapper
.convertValue(moduleResourcesReadRequest.getData().getModules(),
- new TypeReference<List<ModuleReference>>() {});
+ new TypeReference<List<ModuleReference>>() {});
+ }
+
+ private boolean hasTopic(final String topicParamInQuery) {
+ return !(topicParamInQuery == null || topicParamInQuery.isBlank());
}
+
+ private ResponseEntity<Object> handleAsyncRequest(final String resourceIdentifier,
+ final String cmHandle,
+ final DataAccessRequest dataAccessRequest,
+ final String optionsParamInQuery,
+ final String topicParamInQuery) {
+ asyncTaskExecutor.executeAsyncTask(() ->
+ dmiService.getResourceData(
+ cmHandle,
+ resourceIdentifier,
+ optionsParamInQuery,
+ DmiService.RESTCONF_CONTENT_PASSTHROUGH_OPERATIONAL_QUERY_PARAM),
+ topicParamInQuery,
+ dataAccessRequest.getRequestId(),
+ dataAccessRequest.getOperation(),
+ timeOutInMillis
+ );
+ return new ResponseEntity<>(HttpStatus.NO_CONTENT);
+ }
+
}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java b/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java
index 22d47442..753d16f7 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/service/DmiServiceImpl.java
@@ -33,9 +33,9 @@ import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.dmi.config.DmiPluginConfig.DmiPluginProperties;
import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException;
import org.onap.cps.ncmp.dmi.exception.DmiException;
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException;
import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException;
import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException;
-import org.onap.cps.ncmp.dmi.exception.ResourceDataNotFound;
import org.onap.cps.ncmp.dmi.model.DataAccessRequest;
import org.onap.cps.ncmp.dmi.model.ModuleSet;
import org.onap.cps.ncmp.dmi.model.ModuleSetSchemas;
@@ -59,8 +59,6 @@ public class DmiServiceImpl implements DmiService {
private NcmpRestClient ncmpRestClient;
private ObjectMapper objectMapper;
private DmiPluginProperties dmiPluginProperties;
- private static final String RESPONSE_CODE = "response code : ";
- private static final String MESSAGE = " message : ";
/**
* Constructor.
@@ -107,8 +105,8 @@ public class DmiServiceImpl implements DmiService {
"SDNC did not return a module resource for the given cmHandle.");
} else {
log.error("Error occurred when getting module resources from SDNC for the given cmHandle {}", cmHandle);
- throw new DmiException(cmHandle,
- RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody());
+ throw new HttpClientRequestException(
+ cmHandle, responseEntity.getBody(), responseEntity.getStatusCode());
}
}
return yangResources;
@@ -166,20 +164,14 @@ public class DmiServiceImpl implements DmiService {
final String dataType, final String data) {
final ResponseEntity<String> responseEntity =
sdncOperations.writeData(operation, cmHandle, resourceIdentifier, dataType, data);
- if (responseEntity.getStatusCode().is2xxSuccessful()) {
- return responseEntity.getBody();
- } else {
- throw new DmiException(cmHandle,
- RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody());
- }
+ return prepareAndSendResponse(responseEntity, cmHandle);
}
private String prepareAndSendResponse(final ResponseEntity<String> responseEntity, final String cmHandle) {
- if (responseEntity.getStatusCode() == HttpStatus.OK) {
+ if (responseEntity.getStatusCode().is2xxSuccessful()) {
return responseEntity.getBody();
} else {
- throw new ResourceDataNotFound(cmHandle,
- RESPONSE_CODE + responseEntity.getStatusCode() + MESSAGE + responseEntity.getBody());
+ throw new HttpClientRequestException(cmHandle, responseEntity.getBody(), responseEntity.getStatusCode());
}
}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java
deleted file mode 100644
index 373a09d7..00000000
--- a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.service;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.stereotype.Component;
-import org.springframework.util.concurrent.ListenableFuture;
-import org.springframework.util.concurrent.ListenableFutureCallback;
-
-@Component
-@Slf4j
-public class NcmpKafkaPublisher {
-
- private final KafkaTemplate<String, Object> kafkaTemplate;
- private final String topicName;
-
- /**
- * KafkaTemplate and Topic name.
- *
- * @param kafkaTemplate kafka template
- * @param topicName topic name
- */
- @Autowired
- public NcmpKafkaPublisher(final KafkaTemplate<String, Object> kafkaTemplate,
- @Value("${app.ncmp.async-m2m.topic}") final String topicName) {
- this.kafkaTemplate = kafkaTemplate;
- this.topicName = topicName;
- }
-
- /**
- * Sends message to the configured topic with a message key.
- *
- * @param messageKey message key
- * @param payload message payload
- */
- public void sendMessage(final String messageKey, final Object payload) {
- final ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topicName, messageKey, payload);
- send.addCallback(new ListenableFutureCallback<>() {
- @Override
- public void onFailure(final Throwable ex) {
- log.warn("Failed to send the messages {}", ex.getMessage());
- }
-
- @Override
- public void onSuccess(final SendResult<String, Object> result) {
- log.debug("Sent message {}", result.getProducerRecord());
- }
- });
- }
-}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java b/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java
index cf7b4599..179707ab 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/service/client/SdncRestconfClient.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2021-2022 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -69,9 +69,10 @@ public class SdncRestconfClient {
* @param httpHeaders HTTP Headers
* @return response entity
*/
- public ResponseEntity<String> httpOperationWithJsonData(final HttpMethod httpMethod, final String resourceUrl,
- final String jsonData,
- final HttpHeaders httpHeaders) {
+ public ResponseEntity<String> httpOperationWithJsonData(final HttpMethod httpMethod,
+ final String resourceUrl,
+ final String jsonData,
+ final HttpHeaders httpHeaders) {
final String sdncBaseUrl = sdncProperties.getBaseUrl();
final String sdncRestconfUrl = sdncBaseUrl.concat(resourceUrl);
httpHeaders.setBasicAuth(sdncProperties.getAuthUsername(), sdncProperties.getAuthPassword());
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 8be97d2d..6ad9d58d 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -45,19 +45,24 @@ spring:
pathmatch:
matching-strategy: ANT_PATH_MATCHER
kafka:
- bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
+ bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER:localhost:9092}
security:
protocol: PLAINTEXT
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
- client-id: dmi-plugin
+ client-id: ncmp-dmi-plugin
app:
ncmp:
- async-m2m:
+ async:
topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
+notification:
+ async:
+ executor:
+ time-out-value-in-ms: 2000
+
# Actuator
management:
server:
@@ -106,4 +111,4 @@ springdoc:
urlsPrimaryName: query
urls:
- name: query
- url: /api-docs/openapi.yaml \ No newline at end of file
+ url: /api-docs/openapi.yaml
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
new file mode 100644
index 00000000..54c0fe09
--- /dev/null
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
@@ -0,0 +1,136 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the 'License');
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.async
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException
+import org.onap.cps.ncmp.dmi.model.DataAccessRequest
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
+import org.spockframework.spring.SpringBean
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.http.HttpStatus
+import org.springframework.kafka.core.DefaultKafkaProducerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonSerializer
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.DynamicPropertyRegistry
+import org.springframework.test.context.DynamicPropertySource
+import org.testcontainers.containers.KafkaContainer
+import org.testcontainers.spock.Testcontainers
+import org.testcontainers.utility.DockerImageName
+import spock.lang.Specification
+
+import java.time.Duration
+
+@SpringBootTest(classes = [AsyncTaskExecutor, DmiAsyncRequestResponseEventProducer])
+@Testcontainers
+@DirtiesContext
+class AsyncTaskExecutorIntegrationSpec extends Specification {
+
+ static kafkaTestContainer = new KafkaContainer(
+ DockerImageName.parse('confluentinc/cp-kafka:6.2.1')
+ )
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
+ }
+
+ def setupSpec() {
+ kafkaTestContainer.start()
+ }
+
+ def producerConfigProperties = [
+ 'bootstrap.servers' : kafkaTestContainer.getBootstrapServers().split(',')[0],
+ 'retries' : 0,
+ 'batch.size' : 16384,
+ 'linger.ms' : 1,
+ 'buffer.memory' : 33554432,
+ 'key.serializer' : StringSerializer,
+ 'value.serializer' : JsonSerializer
+ ]
+
+ def consumerConfigProperties = [
+ 'bootstrap.servers' : kafkaTestContainer.getBootstrapServers().split(',')[0],
+ 'key.deserializer' : StringDeserializer,
+ 'value.deserializer': StringDeserializer,
+ 'auto.offset.reset' : 'earliest',
+ 'group.id' : 'test'
+ ]
+
+ def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties))
+
+ @SpringBean
+ DmiAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducer =
+ new DmiAsyncRequestResponseEventProducer(kafkaTemplate)
+
+ KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(consumerConfigProperties)
+
+ def spiedObjectMapper = Spy(ObjectMapper)
+
+ def objectUnderTest = new AsyncTaskExecutor(cpsAsyncRequestResponseEventProducer)
+
+ private static final String TEST_TOPIC = 'test-topic'
+
+ def setup() {
+ cpsAsyncRequestResponseEventProducer.dmiNcmpTopic = TEST_TOPIC
+ consumer.subscribe([TEST_TOPIC] as List<String>)
+ }
+
+ def cleanup() {
+ consumer.close()
+ }
+
+ def 'Publish and Subscribe message - success'() {
+ when: 'a successful event is published'
+ objectUnderTest.publishAsyncEvent(TEST_TOPIC, '12345','{}', 'OK', '200')
+ and: 'the topic is polled'
+ def records = consumer.poll(Duration.ofMillis(1500))
+ then: 'the record received is the event sent'
+ def record = records.iterator().next()
+ DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
+ and: 'the status & code matches expected'
+ assert event.getEventContent().getResponseStatus() == 'OK'
+ assert event.getEventContent().getResponseCode() == '200'
+ }
+
+ def 'Publish and Subscribe message - failure'() {
+ when: 'a failure event is published'
+ def exception = new HttpClientRequestException('some cm handle', 'Node not found', HttpStatus.INTERNAL_SERVER_ERROR)
+ objectUnderTest.publishAsyncFailureEvent(TEST_TOPIC, '67890', DataAccessRequest.OperationEnum.READ, exception)
+ and: 'the topic is polled'
+ def records = consumer.poll(Duration.ofMillis(1500))
+ then: 'the record received is the event sent'
+ def record = records.iterator().next()
+ DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
+ and: 'the status & code matches expected'
+ assert event.getEventContent().getResponseStatus() == 'Internal Server Error'
+ assert event.getEventContent().getResponseCode() == '500'
+ }
+
+ @DynamicPropertySource
+ static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
+ dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+ }
+
+} \ No newline at end of file
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy
index 1541f8ca..5bfbc400 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy
@@ -21,11 +21,14 @@
package org.onap.cps.ncmp.dmi.rest.controller
+
import org.onap.cps.ncmp.dmi.TestUtils
import org.onap.cps.ncmp.dmi.exception.DmiException
import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException
import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException
-import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService
+import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor
+import org.onap.cps.ncmp.dmi.notifications.async.DmiAsyncRequestResponseEventProducer
+
import org.onap.cps.ncmp.dmi.service.model.ModuleReference
import org.onap.cps.ncmp.dmi.model.ModuleSet
import org.onap.cps.ncmp.dmi.model.ModuleSetSchemas
@@ -38,6 +41,7 @@ import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
+import org.springframework.kafka.core.KafkaTemplate
import org.springframework.security.test.context.support.WithMockUser
import org.springframework.test.web.servlet.MockMvc
import spock.lang.Specification
@@ -53,7 +57,7 @@ import static org.onap.cps.ncmp.dmi.model.DataAccessRequest.OperationEnum.UPDATE
import static org.springframework.http.HttpStatus.CREATED
import static org.springframework.http.HttpStatus.OK
-@WebMvcTest(DmiRestController)
+@WebMvcTest(DmiRestController.class)
@WithMockUser
class DmiRestControllerSpec extends Specification {
@@ -64,7 +68,10 @@ class DmiRestControllerSpec extends Specification {
DmiService mockDmiService = Mock()
@SpringBean
- NcmpKafkaPublisherService mockNcmpKafkaPublisherService = Mock()
+ DmiAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducer = new DmiAsyncRequestResponseEventProducer(Mock(KafkaTemplate))
+
+ @SpringBean
+ AsyncTaskExecutor asyncTaskExecutor = new AsyncTaskExecutor(cpsAsyncRequestResponseEventProducer)
@Value('${rest.api.dmi-base-path}/v1')
def basePathV1
@@ -256,6 +263,21 @@ class DmiRestControllerSpec extends Specification {
response.getContentAsString() == '{some-json}'
}
+ def 'PassThrough Returns OK when topic is used for async'(){
+ given: 'an endpoint'
+ def readPassThroughUrl ="${basePathV1}/ch/some-cmHandle/data/ds/ncmp-datastore:" +
+ resourceIdentifier +
+ '?resourceIdentifier=some-resourceIdentifier&topic=test-topic'
+ when: 'endpoint is invoked'
+ def jsonData = TestUtils.getResourceFileContent('readData.json')
+ def response = mvc.perform(
+ post(readPassThroughUrl).contentType(MediaType.APPLICATION_JSON).content(jsonData)
+ ).andReturn().response
+ then: 'response status is OK'
+ assert response.status == HttpStatus.NO_CONTENT.value()
+ where: 'the following values are used'
+ resourceIdentifier << ['passthrough-operational', 'passthrough-running']
+ }
def 'Get resource data for pass-through running with #scenario value in resource identifier param.'() {
given: 'Get resource data url'
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy
index e38d5c37..1d87b775 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy
@@ -29,7 +29,7 @@ import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException
import org.onap.cps.ncmp.dmi.exception.DmiException
import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException
import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException
-import org.onap.cps.ncmp.dmi.exception.ResourceDataNotFound
+import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException
import org.onap.cps.ncmp.dmi.service.model.ModuleReference
import org.onap.cps.ncmp.dmi.model.YangResource
import org.onap.cps.ncmp.dmi.model.YangResources
@@ -221,7 +221,7 @@ class DmiServiceImplSpec extends Specification {
objectUnderTest.getResourceData(cmHandle,
resourceId, optionsParam, restConfQueryParam)
then: 'resource data not found'
- thrown(ResourceDataNotFound.class)
+ thrown(HttpClientRequestException.class)
}
def 'Get resource data for passthrough running.'() {
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy
deleted file mode 100644
index f5bc4ac4..00000000
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.service
-
-import spock.lang.Specification
-
-class NcmpKafkaPublisherServiceSpec extends Specification {
-
- def mockNcmpKafkaPublisher = Mock(NcmpKafkaPublisher)
- def objectUnderTest = new NcmpKafkaPublisherService(mockNcmpKafkaPublisher)
-
- def 'Message publishing'() {
- given: 'a sample message with key'
- def message = 'sample message'
- def messageKey = 'sample-key'
- when: 'published'
- objectUnderTest.publishToNcmp(messageKey, message)
- then: 'no exception is thrown'
- noExceptionThrown()
- and: 'message is published once'
- 1 * mockNcmpKafkaPublisher.sendMessage(messageKey, message)
- }
-}
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy
deleted file mode 100644
index 00c8e6e7..00000000
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.service
-
-import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.beans.factory.annotation.Value
-import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.context.annotation.Bean
-import org.springframework.context.annotation.Configuration
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.support.serializer.JsonDeserializer
-import org.springframework.test.annotation.DirtiesContext
-import org.springframework.test.context.DynamicPropertyRegistry
-import org.springframework.test.context.DynamicPropertySource
-import org.testcontainers.containers.KafkaContainer
-import org.testcontainers.spock.Testcontainers
-import spock.lang.Specification
-
-import java.time.Duration
-
-import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
-import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
-
-@SpringBootTest
-@Testcontainers
-@DirtiesContext
-class NcmpKafkaPublisherSpec extends Specification {
-
- static kafkaTestContainer = new KafkaContainer()
- static {
- Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
- }
-
- def setupSpec() {
- kafkaTestContainer.start()
- }
-
- @Autowired
- KafkaTemplate<String, Object> kafkaTemplate
-
- @Value('${app.ncmp.async-m2m.topic}')
- String topic
-
- KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(kafkaConsumerConfig())
-
- def 'Publish and Subscribe message'() {
- given: 'a sample messsage and key'
- def message = 'sample message'
- def messageKey = 'message-key'
- def objectUnderTest = new NcmpKafkaPublisher(kafkaTemplate, topic)
- when: 'a message is published'
- objectUnderTest.sendMessage(messageKey, message)
- then: 'a message is consumed'
- consumer.subscribe([topic] as List<String>)
- def records = consumer.poll(Duration.ofMillis(1000))
- assert records.size() == 1
- assert messageKey == records[0].key
- assert message == records[0].value
- }
-
- def kafkaConsumerConfig() {
- def configs = [:]
- configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.name)
- configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.name)
- configs.put(AUTO_OFFSET_RESET_CONFIG, 'earliest')
- configs.put(BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers().split(",")[0])
- configs.put(GROUP_ID_CONFIG, 'test')
- return configs
- }
-
- @DynamicPropertySource
- static void registerKafkaProperties(DynamicPropertyRegistry registry) {
- registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
- }
-}
-
-@Configuration
-class TopicConfig {
- @Bean
- NewTopic newTopic() {
- return new NewTopic("my-topic-name", 1, (short) 1);
- }
-}
diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml
index 0d3784f9..9ed37a73 100644
--- a/src/test/resources/application.yml
+++ b/src/test/resources/application.yml
@@ -47,19 +47,22 @@ dmi:
spring:
application:
name: ncmp-dmi-plugin
+ mvc:
+ pathmatch:
+ matching-strategy: ANT_PATH_MATCHER
kafka:
- bootstrap-servers: localhost:9092
+ bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
security:
protocol: PLAINTEXT
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
- client-id: dmi-plugin
+ client-id: ncmp-dmi-plugin
app:
ncmp:
- async-m2m:
- topic: my-topic-name
+ async:
+ topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
logging:
format: json