summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authormpriyank <priyank.maheshwari@est.tech>2022-04-08 15:12:22 +0530
committermpriyank <priyank.maheshwari@est.tech>2022-04-11 19:31:37 +0530
commit78e1d0616474d50d1b3d156e588c6cea9cba783d (patch)
tree8f3c1d9c02be33194207966b0e56a87aeab0181c /src/main
parent08b499572102640cd6d298fec2abf443413803cb (diff)
NCMP Kafka Producer Infrastructure
- Producer configuration and Topic related information. - Accepted topicParameter and request id from NCMP to process async - Replacing Springfox and using Springdoc instead Issue-ID: CPS-829 Change-Id: I369b5ec6c16318220bb218701006918a0bf21419 Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/config/DmiPluginConfig.java22
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java11
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java72
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java44
-rw-r--r--src/main/resources/application.yml15
5 files changed, 147 insertions, 17 deletions
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/config/DmiPluginConfig.java b/src/main/java/org/onap/cps/ncmp/dmi/config/DmiPluginConfig.java
index 31a78111..6106c6af 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/config/DmiPluginConfig.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/config/DmiPluginConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2021-2022 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,29 +21,23 @@
package org.onap.cps.ncmp.dmi.config;
import lombok.Getter;
+import org.springdoc.core.GroupedOpenApi;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
-import springfox.documentation.builders.PathSelectors;
-import springfox.documentation.builders.RequestHandlerSelectors;
-import springfox.documentation.spi.DocumentationType;
-import springfox.documentation.spring.web.plugins.Docket;
@Configuration
public class DmiPluginConfig {
+
/**
- * Swagger-ui configuration.
+ * Swagger-ui configuration using springdoc.
*/
- @Bean("dmi-plugin-docket")
- public Docket api() {
- return new Docket(DocumentationType.OAS_30)
- .groupName("dmi-plugin-docket")
- .select()
- .apis(RequestHandlerSelectors.any())
- .paths(PathSelectors.any())
- .build();
+ @Bean("dmi-plugin-api")
+ public GroupedOpenApi api() {
+ return GroupedOpenApi.builder().group("dmi-plugin-api")
+ .pathsToMatch("/swagger-ui/**,/swagger-resources/**,/v3/api-docs").build();
}
@Getter
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java b/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
index 653ebf7f..d12b9eee 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/rest/controller/DmiRestController.java
@@ -40,6 +40,7 @@ import org.onap.cps.ncmp.dmi.model.YangResources;
import org.onap.cps.ncmp.dmi.rest.api.DmiPluginApi;
import org.onap.cps.ncmp.dmi.rest.api.DmiPluginInternalApi;
import org.onap.cps.ncmp.dmi.service.DmiService;
+import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService;
import org.onap.cps.ncmp.dmi.service.model.ModuleReference;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -56,6 +57,8 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
private final ObjectMapper objectMapper;
+ private final NcmpKafkaPublisherService ncmpKafkaPublisherService;
+
private static final Map<OperationEnum, HttpStatus> operationToHttpStatusMap = new HashMap<>(6);
static {
@@ -67,6 +70,7 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
operationToHttpStatusMap.put(OperationEnum.DELETE, HttpStatus.NO_CONTENT);
}
+
@Override
public ResponseEntity<ModuleSet> getModuleReferences(final String cmHandle,
final @Valid ModuleReferencesRequest body) {
@@ -107,6 +111,7 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
* @param cmHandle cm handle identifier
* @param dataAccessRequest data Access Request
* @param optionsParamInQuery options query parameter
+ * @param topicParamInQuery optional topic parameter
* @return {@code ResponseEntity} response entity
*/
@Override
@@ -114,7 +119,8 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
final String cmHandle,
final @Valid DataAccessRequest
dataAccessRequest,
- final @Valid String optionsParamInQuery) {
+ final @Valid String optionsParamInQuery,
+ final String topicParamInQuery) {
if (isReadOperation(dataAccessRequest)) {
final String resourceDataAsJson = dmiService.getResourceData(cmHandle,
resourceIdentifier,
@@ -130,7 +136,8 @@ public class DmiRestController implements DmiPluginApi, DmiPluginInternalApi {
final String cmHandle,
final @Valid DataAccessRequest
dataAccessRequest,
- final @Valid String optionsParamInQuery) {
+ final @Valid String optionsParamInQuery,
+ final String topicParamInQuery) {
final String sdncResponse;
if (isReadOperation(dataAccessRequest)) {
sdncResponse = dmiService.getResourceData(cmHandle,
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java
new file mode 100644
index 00000000..373a09d7
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisher.java
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Component;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+@Component
+@Slf4j
+public class NcmpKafkaPublisher {
+
+ private final KafkaTemplate<String, Object> kafkaTemplate;
+ private final String topicName;
+
+ /**
+ * KafkaTemplate and Topic name.
+ *
+ * @param kafkaTemplate kafka template
+ * @param topicName topic name
+ */
+ @Autowired
+ public NcmpKafkaPublisher(final KafkaTemplate<String, Object> kafkaTemplate,
+ @Value("${app.ncmp.async-m2m.topic}") final String topicName) {
+ this.kafkaTemplate = kafkaTemplate;
+ this.topicName = topicName;
+ }
+
+ /**
+ * Sends message to the configured topic with a message key.
+ *
+ * @param messageKey message key
+ * @param payload message payload
+ */
+ public void sendMessage(final String messageKey, final Object payload) {
+ final ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topicName, messageKey, payload);
+ send.addCallback(new ListenableFutureCallback<>() {
+ @Override
+ public void onFailure(final Throwable ex) {
+ log.warn("Failed to send the messages {}", ex.getMessage());
+ }
+
+ @Override
+ public void onSuccess(final SendResult<String, Object> result) {
+ log.debug("Sent message {}", result.getProducerRecord());
+ }
+ });
+ }
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java
new file mode 100644
index 00000000..f5e1839b
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherService.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.service;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@AllArgsConstructor
+public class NcmpKafkaPublisherService {
+
+ private final NcmpKafkaPublisher ncmpKafkaPublisher;
+
+ /**
+ * publish the message to NCMP.
+ *
+ * @param messageKey message key
+ * @param message message payload
+ */
+ public void publishToNcmp(final String messageKey, final Object message) {
+ log.debug("Publishing message : {} to NCMP with message-key : {}", message, messageKey);
+ ncmpKafkaPublisher.sendMessage(messageKey, message);
+ }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 2d324c5b..71a689cd 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (C) 2021 Nordix Foundation
+# Copyright (C) 2021-2022 Nordix Foundation
# Modifications Copyright (C) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -42,6 +42,19 @@ spring:
mvc:
pathmatch:
matching-strategy: ANT_PATH_MATCHER
+ kafka:
+ bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVER}
+ security:
+ protocol: PLAINTEXT
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ client-id: dmi-plugin
+
+app:
+ ncmp:
+ async-m2m:
+ topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
# Actuator
management: