summaryrefslogtreecommitdiffstats
path: root/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org')
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java127
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java104
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java64
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java20
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java12
5 files changed, 249 insertions, 78 deletions
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java b/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java
new file mode 100644
index 00000000..cb617f9e
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java
@@ -0,0 +1,127 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.config.kafka;
+
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+/**
+ * kafka Configuration for legacy and cloud events.
+ *
+ * @param <T> valid legacy event to be published over the wire.
+ */
+@Configuration
+@EnableKafka
+@RequiredArgsConstructor
+public class KafkaConfig<T> {
+
+ private final KafkaProperties kafkaProperties;
+
+ /**
+ * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
+ * application.yml and replaces value-serializer by JsonSerializer.
+ *
+ * @return legacy event producer instance.
+ */
+ @Bean
+ public ProducerFactory<String, T> legacyEventProducerFactory() {
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+ producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ return new DefaultKafkaProducerFactory<>(producerConfigProperties);
+ }
+
+ /**
+ * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
+ * into application.yml and replaces deserializer-value by JsonDeserializer.
+ *
+ * @return an instance of legacy consumer factory.
+ */
+ @Bean
+ public ConsumerFactory<String, T> legacyEventConsumerFactory() {
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+ consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
+ return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+ }
+
+ /**
+ * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
+ * application.yml with CloudEventSerializer.
+ *
+ * @return cloud event producer instance.
+ */
+ @Bean
+ public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+ return new DefaultKafkaProducerFactory<>(producerConfigProperties);
+ }
+
+ /**
+ * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
+ * into application.yml having CloudEventDeserializer as deserializer-value.
+ *
+ * @return an instance of cloud consumer factory.
+ */
+ @Bean
+ public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+ return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+ }
+
+ /**
+ * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
+ *
+ * @return an instance of legacy Kafka template.
+ */
+ @Bean
+ @Primary
+ public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
+ final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
+ kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
+ return kafkaTemplate;
+ }
+
+ /**
+ * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
+ *
+ * @return an instance of cloud Kafka template.
+ */
+ @Bean
+ public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
+ final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
+ kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
+ return kafkaTemplate;
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java
new file mode 100644
index 00000000..b8bd277d
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java
@@ -0,0 +1,104 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.avc;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.time.format.DateTimeFormatter;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
+import org.onap.cps.ncmp.events.avc1_0_0.Data;
+import org.onap.cps.ncmp.events.avc1_0_0.DatastoreChanges;
+import org.onap.cps.ncmp.events.avc1_0_0.Edit;
+import org.onap.cps.ncmp.events.avc1_0_0.IetfYangPatchYangPatch;
+import org.onap.cps.ncmp.events.avc1_0_0.PushChangeUpdate;
+import org.onap.cps.ncmp.events.avc1_0_0.Value;
+
+/**
+ * Helper to create AvcEvents.
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DmiDataAvcCloudEventCreator {
+
+ private static final DateTimeFormatter dateTimeFormatter =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Creates CloudEvent for DMI Data AVC.
+ *
+ * @param eventCorrelationId correlationid
+ * @return Cloud Event
+ */
+ public static CloudEvent createCloudEvent(final String eventCorrelationId) {
+
+ CloudEvent cloudEvent = null;
+
+ try {
+ cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create("NCMP"))
+ .withType(AvcEvent.class.getName())
+ .withDataSchema(URI.create("urn:cps:" + AvcEvent.class.getName() + ":1.0.0"))
+ .withExtension("correlationid", eventCorrelationId)
+ .withData(objectMapper.writeValueAsBytes(createDmiDataAvcEvent())).build();
+ } catch (final JsonProcessingException jsonProcessingException) {
+ log.error("Unable to convert object to json : {}", jsonProcessingException.getMessage());
+ }
+
+ return cloudEvent;
+ }
+
+ private static AvcEvent createDmiDataAvcEvent() {
+ final AvcEvent avcEvent = new AvcEvent();
+ final Data data = new Data();
+ final PushChangeUpdate pushChangeUpdate = new PushChangeUpdate();
+ final DatastoreChanges datastoreChanges = new DatastoreChanges();
+ final IetfYangPatchYangPatch ietfYangPatchYangPatch = new IetfYangPatchYangPatch();
+ ietfYangPatchYangPatch.setPatchId("abcd");
+ final Edit edit1 = new Edit();
+ final Value value = new Value();
+ final Map<String, Object> attributeMap = new LinkedHashMap<>();
+ attributeMap.put("isHoAllowed", false);
+ value.setAttributes(List.of(attributeMap));
+ edit1.setEditId("editId");
+ edit1.setOperation("replace");
+ edit1.setTarget("target_xpath");
+ edit1.setValue(value);
+ ietfYangPatchYangPatch.setEdit(List.of(edit1));
+ datastoreChanges.setIetfYangPatchYangPatch(ietfYangPatchYangPatch);
+ pushChangeUpdate.setDatastoreChanges(datastoreChanges);
+ data.setPushChangeUpdate(pushChangeUpdate);
+
+ avcEvent.setData(data);
+ return avcEvent;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java
deleted file mode 100644
index 03ed1c4c..00000000
--- a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.notifications.avc;
-
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.event.model.AvcEvent;
-
-/**
- * Helper to create AvcEvents.
- */
-@Slf4j
-public class DmiDataAvcEventCreator {
-
- private static final DateTimeFormatter dateTimeFormatter
- = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
-
- /**
- * Create an AVC event.
- *
- * @param eventCorrelationId the event correlation id
- * @return DmiAsyncRequestResponseEvent
- */
- public AvcEvent createEvent(final String eventCorrelationId) {
- final AvcEvent avcEvent = new AvcEvent();
- avcEvent.setEventId(UUID.randomUUID().toString());
- avcEvent.setEventCorrelationId(eventCorrelationId);
- avcEvent.setEventType(AvcEvent.class.getName());
- avcEvent.setEventSchema("urn:cps:" + AvcEvent.class.getName());
- avcEvent.setEventSchemaVersion("v1");
- avcEvent.setEventSource("NCMP");
- avcEvent.setEventTime(ZonedDateTime.now().format(dateTimeFormatter));
-
- final Map<String, Object> eventPayload = new LinkedHashMap<>();
- eventPayload.put("push-change-update", "{}");
- avcEvent.setEvent(eventPayload);
-
- log.debug("Avc Event Created ID: {}", avcEvent.getEventId());
- return avcEvent;
- }
-
-} \ No newline at end of file
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java
index 4fd46b66..075dcf20 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java
@@ -20,9 +20,11 @@
package org.onap.cps.ncmp.dmi.notifications.avc;
+
+import io.cloudevents.CloudEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.event.model.AvcEvent;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@@ -31,16 +33,18 @@ import org.springframework.stereotype.Service;
@RequiredArgsConstructor
public class DmiDataAvcEventProducer {
- private final KafkaTemplate<String, AvcEvent> kafkaTemplate;
-
+ private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
+
/**
- * Sends message to the configured topic with a message key.
+ * Publishing DMI Data AVC event payload as CloudEvent.
*
- * @param requestId the request id
- * @param avcEvent the event to publish
+ * @param requestId the request id
+ * @param cloudAvcEvent event with data as DMI DataAVC event
*/
- public void sendMessage(final String requestId, final AvcEvent avcEvent) {
- kafkaTemplate.send("dmi-cm-events", requestId, avcEvent);
+ public void publishDmiDataAvcCloudEvent(final String requestId, final CloudEvent cloudAvcEvent) {
+ final ProducerRecord<String, CloudEvent> producerRecord =
+ new ProducerRecord<>("dmi-cm-events", requestId, cloudAvcEvent);
+ cloudEventKafkaTemplate.send(producerRecord);
log.debug("AVC event sent");
}
}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java
index f7f4bf96..c5fb8fbe 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java
@@ -20,10 +20,10 @@
package org.onap.cps.ncmp.dmi.notifications.avc;
+import io.cloudevents.CloudEvent;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.event.model.AvcEvent;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
@@ -42,18 +42,18 @@ public class DmiDataAvcEventSimulationController {
/**
* Simulate Event for AVC.
+ *
* @param numberOfSimulatedEvents number of events to be generated
* @return ResponseEntity
*/
@GetMapping(path = "/v1/simulateDmiDataEvent")
- public ResponseEntity<Void> simulateEvents(@RequestParam("numberOfSimulatedEvents")
- final Integer numberOfSimulatedEvents) {
- final DmiDataAvcEventCreator dmiDataAvcEventCreator = new DmiDataAvcEventCreator();
+ public ResponseEntity<Void> simulateEvents(
+ @RequestParam("numberOfSimulatedEvents") final Integer numberOfSimulatedEvents) {
for (int i = 0; i < numberOfSimulatedEvents; i++) {
final String eventCorrelationId = UUID.randomUUID().toString();
- final AvcEvent avcEvent = dmiDataAvcEventCreator.createEvent(eventCorrelationId);
- dmiDataAvcEventProducer.sendMessage(eventCorrelationId, avcEvent);
+ final CloudEvent cloudEvent = DmiDataAvcCloudEventCreator.createCloudEvent(eventCorrelationId);
+ dmiDataAvcEventProducer.publishDmiDataAvcCloudEvent(eventCorrelationId, cloudEvent);
}
return new ResponseEntity<>(HttpStatus.OK);