summaryrefslogtreecommitdiffstats
path: root/aai-core/src/main/java
diff options
context:
space:
mode:
authorFiete Ostkamp <Fiete.Ostkamp@telekom.de>2024-10-08 11:52:14 +0200
committerFiete Ostkamp <Fiete.Ostkamp@telekom.de>2024-10-09 11:12:06 +0200
commit180f2967c7db5cbbace67616146a66d9d9658af7 (patch)
tree106a9019968b489ca717ddeb7a3e93f5328ac509 /aai-core/src/main/java
parent6cc66cadc9db495883ce1211c1a0f712e77f8bc7 (diff)
Refactor UEBNotification class
- refactor UEBNotification class that it does not use ActiveMQ anymore and directly publishes to Kafka - use .domain.NotificationEvent class - serialize as late as possible Issue-ID: AAI-3931 Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de> Change-Id: I3836519752f810f905a9aed96678d497783a2e5d
Diffstat (limited to 'aai-core/src/main/java')
-rw-r--r--aai-core/src/main/java/org/onap/aai/config/RestBeanConfig.java11
-rw-r--r--aai-core/src/main/java/org/onap/aai/domain/notificationEvent/NotificationEvent.java490
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java6
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/NotificationProducer.java31
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/NotificationProducerService.java48
-rw-r--r--aai-core/src/main/java/org/onap/aai/prevalidation/ValidationService.java47
-rw-r--r--aai-core/src/main/java/org/onap/aai/rest/db/HttpEntry.java2
-rw-r--r--aai-core/src/main/java/org/onap/aai/rest/notification/NotificationService.java8
-rw-r--r--aai-core/src/main/java/org/onap/aai/rest/notification/UEBNotification.java116
-rw-r--r--aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java9
-rw-r--r--aai-core/src/main/java/org/onap/aai/web/KafkaNotificationEventConfig.java111
11 files changed, 297 insertions, 582 deletions
diff --git a/aai-core/src/main/java/org/onap/aai/config/RestBeanConfig.java b/aai-core/src/main/java/org/onap/aai/config/RestBeanConfig.java
index c04e4e3c..3cc1719c 100644
--- a/aai-core/src/main/java/org/onap/aai/config/RestBeanConfig.java
+++ b/aai-core/src/main/java/org/onap/aai/config/RestBeanConfig.java
@@ -32,8 +32,19 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.web.context.annotation.RequestScope;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule;
+
@Configuration
public class RestBeanConfig {
+
+ @Bean
+ public ObjectMapper objectMapper() {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(new JaxbAnnotationModule());
+ return objectMapper;
+ }
+
@Bean(name = "traversalUriHttpEntry")
@Scope(scopeName = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public HttpEntry traversalUriHttpEntry() {
diff --git a/aai-core/src/main/java/org/onap/aai/domain/notificationEvent/NotificationEvent.java b/aai-core/src/main/java/org/onap/aai/domain/notificationEvent/NotificationEvent.java
index dad35669..9117e998 100644
--- a/aai-core/src/main/java/org/onap/aai/domain/notificationEvent/NotificationEvent.java
+++ b/aai-core/src/main/java/org/onap/aai/domain/notificationEvent/NotificationEvent.java
@@ -17,188 +17,35 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-//
-// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, v2.2.4-2
-// See <a href="http://java.sun.com/xml/jaxb">http://java.sun.com/xml/jaxb</a>
-// Any modifications to this file will be lost upon recompilation of the source schema.
-// Generated on: 2016.01.06 at 05:38:00 PM EST
-//
-
package org.onap.aai.domain.notificationEvent;
import javax.xml.bind.annotation.*;
-import org.w3c.dom.Element;
+import com.fasterxml.jackson.annotation.JsonProperty;
-/**
- * <p>
- * Java class for anonymous complex type.
- *
- * <p>
- * The following schema fragment specifies the expected content contained within this class.
- *
- * <pre>
- * &lt;complexType>
- * &lt;complexContent>
- * &lt;restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
- * &lt;sequence>
- * &lt;element name="cambria.partition" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="event-header" minOccurs="0">
- * &lt;complexType>
- * &lt;complexContent>
- * &lt;restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
- * &lt;sequence>
- * &lt;element name="id" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="timestamp" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="source-name" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="domain" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="sequence-number" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="severity" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="event-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="version" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="action" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="entity-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="top-entity-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="entity-link" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="status" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;/sequence>
- * &lt;/restriction>
- * &lt;/complexContent>
- * &lt;/complexType>
- * &lt;/element>
- * &lt;any processContents='lax' namespace='##other' minOccurs="0"/>
- * &lt;/sequence>
- * &lt;/restriction>
- * &lt;/complexContent>
- * &lt;/complexType>
- * </pre>
- *
- *
- */
+import lombok.Data;
+
+@Data
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType(name = "", propOrder = {"cambriaPartition", "eventHeader", "entity"})
@XmlRootElement(name = "NotificationEvent")
public class NotificationEvent {
@XmlElement(name = "cambria.partition")
+ @JsonProperty("cambria.partition")
protected String cambriaPartition;
@XmlElement(name = "event-header")
+ @JsonProperty("event-header")
protected EventHeader eventHeader;
@XmlAnyElement(lax = true)
protected Object entity;
- /**
- * Gets the value of the eventHeader property.
- *
- * @return
- * possible object is
- * {@link EventHeader }
- *
- */
- public EventHeader getEventHeader() {
- return eventHeader;
- }
-
- /**
- * Sets the value of the eventHeader property.
- *
- * @param value
- * allowed object is
- * {@link EventHeader }
- *
- */
- public void setEventHeader(EventHeader value) {
- this.eventHeader = value;
- }
-
- /**
- * Gets the value of the any property.
- *
- * @return
- * possible object is
- * {@link Object }
- * {@link Element }
- *
- */
- public Object getEntity() {
- return entity;
- }
-
- /**
- * Sets the value of the any property.
- *
- * @param value
- * allowed object is
- * {@link Object }
- * {@link Element }
- *
- */
- public void setEntity(Object value) {
- this.entity = value;
- }
-
- /**
- * Gets the value of the cambriaPartition property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getCambriaPartition() {
- return cambriaPartition;
- }
-
- /**
- * Sets the value of the cambriaPartition property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setCambriaPartition(String value) {
- this.cambriaPartition = value;
- }
-
- /**
- * <p>
- * Java class for anonymous complex type.
- *
- * <p>
- * The following schema fragment specifies the expected content contained within this class.
- *
- * <pre>
- * &lt;complexType>
- * &lt;complexContent>
- * &lt;restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
- * &lt;sequence>
- * &lt;element name="id" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="timestamp" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="source-name" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="domain" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="sequence-number" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="severity" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="event-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="version" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="action" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="entity-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="top-entity-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="entity-link" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;element name="status" type="{http://www.w3.org/2001/XMLSchema}string"/>
- * &lt;/sequence>
- * &lt;/restriction>
- * &lt;/complexContent>
- * &lt;/complexType>
- * </pre>
- *
- *
- */
+ @Data
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType(
name = "",
propOrder = {"id", "timestamp", "sourceName", "domain", "sequenceNumber", "severity", "eventType",
- "version", "action", "entityType", "topEntityType", "entityLink", "status"})
+ "version", "action", "entityType", "topEntityType", "entityLink"})
public static class EventHeader {
@XmlElement(required = true)
@@ -206,340 +53,31 @@ public class NotificationEvent {
@XmlElement(required = true)
protected String timestamp;
@XmlElement(name = "source-name", required = true)
+ @JsonProperty("source-name")
protected String sourceName;
@XmlElement(required = true)
protected String domain;
@XmlElement(name = "sequence-number", required = true)
+ @JsonProperty("sequence-number")
protected String sequenceNumber;
@XmlElement(required = true)
protected String severity;
@XmlElement(name = "event-type", required = true)
+ @JsonProperty("event-type")
protected String eventType;
@XmlElement(required = true)
protected String version;
@XmlElement(required = true)
protected String action;
@XmlElement(name = "entity-type", required = true)
+ @JsonProperty("entity-type")
protected String entityType;
@XmlElement(name = "top-entity-type", required = true)
+ @JsonProperty("top-entity-type")
protected String topEntityType;
@XmlElement(name = "entity-link", required = true)
+ @JsonProperty("entity-link")
protected String entityLink;
- @XmlElement(required = true)
- protected String status;
-
- /**
- * Gets the value of the id property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getId() {
- return id;
- }
-
- /**
- * Sets the value of the id property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setId(String value) {
- this.id = value;
- }
-
- /**
- * Gets the value of the timestamp property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getTimestamp() {
- return timestamp;
- }
-
- /**
- * Sets the value of the timestamp property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setTimestamp(String value) {
- this.timestamp = value;
- }
-
- /**
- * Gets the value of the sourceName property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getSourceName() {
- return sourceName;
- }
-
- /**
- * Sets the value of the sourceName property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setSourceName(String value) {
- this.sourceName = value;
- }
-
- /**
- * Gets the value of the domain property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getDomain() {
- return domain;
- }
-
- /**
- * Sets the value of the domain property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setDomain(String value) {
- this.domain = value;
- }
-
- /**
- * Gets the value of the sequenceNumber property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getSequenceNumber() {
- return sequenceNumber;
- }
-
- /**
- * Sets the value of the sequenceNumber property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setSequenceNumber(String value) {
- this.sequenceNumber = value;
- }
-
- /**
- * Gets the value of the severity property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getSeverity() {
- return severity;
- }
-
- /**
- * Sets the value of the severity property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setSeverity(String value) {
- this.severity = value;
- }
-
- /**
- * Gets the value of the eventType property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getEventType() {
- return eventType;
- }
-
- /**
- * Sets the value of the eventType property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setEventType(String value) {
- this.eventType = value;
- }
-
- /**
- * Gets the value of the version property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getVersion() {
- return version;
- }
-
- /**
- * Sets the value of the version property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setVersion(String value) {
- this.version = value;
- }
-
- /**
- * Gets the value of the action property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getAction() {
- return action;
- }
-
- /**
- * Sets the value of the action property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setAction(String value) {
- this.action = value;
- }
-
- /**
- * Gets the value of the entityType property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getEntityType() {
- return entityType;
- }
-
- /**
- * Sets the value of the entityType property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setEntityType(String value) {
- this.entityType = value;
- }
-
- /**
- * Gets the value of the topEntityType property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getTopEntityType() {
- return topEntityType;
- }
-
- /**
- * Sets the value of the topEntityType property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setTopEntityType(String value) {
- this.topEntityType = value;
- }
-
- /**
- * Gets the value of the entityLink property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getEntityLink() {
- return entityLink;
- }
-
- /**
- * Sets the value of the entityLink property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setEntityLink(String value) {
- this.entityLink = value;
- }
-
- /**
- * Gets the value of the status property.
- *
- * @return
- * possible object is
- * {@link String }
- *
- */
- public String getStatus() {
- return status;
- }
-
- /**
- * Sets the value of the status property.
- *
- * @param value
- * allowed object is
- * {@link String }
- *
- */
- public void setStatus(String value) {
- this.status = value;
- }
-
}
}
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java
index d6a491ef..09fc68a2 100644
--- a/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java
+++ b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java
@@ -22,6 +22,12 @@ package org.onap.aai.kafka;
import org.json.JSONObject;
+/**
+ * MessageProducer interface based on untyped messages
+ *
+ * @deprecated use {@link org.onap.aai.kafka.NotificationProducer} instead
+ */
+@Deprecated
public interface MessageProducer {
void sendMessageToDefaultDestination(JSONObject finalJson);
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducer.java
new file mode 100644
index 00000000..3c739174
--- /dev/null
+++ b/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducer.java
@@ -0,0 +1,31 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.aai.kafka;
+
+import org.onap.aai.domain.notificationEvent.NotificationEvent;
+import org.onap.aai.rest.notification.UEBNotification;
+import org.springframework.stereotype.Service;
+
+@Service
+public interface NotificationProducer {
+ public void sendNotification(NotificationEvent notificationEvent);
+ public void sendUEBNotification(UEBNotification uebNotification);
+}
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducerService.java b/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducerService.java
new file mode 100644
index 00000000..44a03ba1
--- /dev/null
+++ b/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducerService.java
@@ -0,0 +1,48 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.aai.kafka;
+
+import org.onap.aai.domain.notificationEvent.NotificationEvent;
+import org.onap.aai.rest.notification.UEBNotification;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+import lombok.RequiredArgsConstructor;
+
+@Service
+@RequiredArgsConstructor
+public class NotificationProducerService implements NotificationProducer {
+
+ private final KafkaTemplate<String,NotificationEvent> kafkaTemplate;
+ @Value("${aai.notifications.enabled:true}") boolean notificationsEnabled;
+
+ public void sendNotification(NotificationEvent notificationEvent) {
+ if(notificationsEnabled) {
+ kafkaTemplate.send("AAI-EVENT", notificationEvent);
+ }
+ }
+
+ public void sendUEBNotification(UEBNotification uebNotification) {
+ uebNotification.getEvents().stream()
+ .forEach(this::sendNotification);
+ }
+}
diff --git a/aai-core/src/main/java/org/onap/aai/prevalidation/ValidationService.java b/aai-core/src/main/java/org/onap/aai/prevalidation/ValidationService.java
index 093062a9..9d8f5fdb 100644
--- a/aai-core/src/main/java/org/onap/aai/prevalidation/ValidationService.java
+++ b/aai-core/src/main/java/org/onap/aai/prevalidation/ValidationService.java
@@ -22,8 +22,8 @@
package org.onap.aai.prevalidation;
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
@@ -41,9 +41,10 @@ import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.http.conn.ConnectTimeoutException;
+import org.onap.aai.domain.notificationEvent.NotificationEvent;
+import org.onap.aai.domain.notificationEvent.NotificationEvent.EventHeader;
import org.onap.aai.exceptions.AAIException;
-import org.onap.aai.introspection.Introspector;
-import org.onap.aai.rest.notification.NotificationEvent;
+
import org.onap.aai.restclient.RestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,22 +75,19 @@ public class ValidationService {
static final String VALIDATION_HEALTH_ENDPOINT = "/v1/info";
private static final Logger LOGGER = LoggerFactory.getLogger(ValidationService.class);
- private static final String ENTITY_TYPE = "entity-type";
- private static final String ACTION = "action";
- private static final String SOURCE_NAME = "source-name";
private static final String DELETE = "DELETE";
private final RestClient validationRestClient;
private final String appName;
private final Set<String> validationNodeTypes;
- private final Gson gson;
-
- private List<Pattern> exclusionList;
+ private final ObjectMapper mapper;
+ private final List<Pattern> exclusionList;
public ValidationService(@Qualifier("validationRestClient") RestClient validationRestClient,
@Value("${spring.application.name}") String appName,
@Value("${validation.service.node-types}") String validationNodes,
- @Value("${validation.service.exclusion-regexes:#{null}}") String exclusionRegexes) {
+ @Value("${validation.service.exclusion-regexes:#{null}}") String exclusionRegexes,
+ ObjectMapper mapper) {
this.validationRestClient = validationRestClient;
this.appName = appName;
@@ -101,7 +99,7 @@ public class ValidationService {
this.exclusionList =
Arrays.stream(exclusionRegexes.split(",")).map(Pattern::compile).collect(Collectors.toList());
}
- this.gson = new Gson();
+ this.mapper = mapper;
LOGGER.info("Successfully initialized the pre validation service");
}
@@ -119,7 +117,7 @@ public class ValidationService {
ResponseEntity<String> healthCheckResponse = null;
try {
healthCheckResponse =
- validationRestClient.execute(VALIDATION_HEALTH_ENDPOINT, HttpMethod.GET, httpHeaders, null);
+ validationRestClient.execute(VALIDATION_HEALTH_ENDPOINT, HttpMethod.GET, httpHeaders);
} catch (Exception ex) {
AAIException validationException = new AAIException("AAI_4021", ex);
throw validationException;
@@ -142,7 +140,7 @@ public class ValidationService {
}
for (NotificationEvent event : notificationEvents) {
- Introspector eventHeader = event.getEventHeader();
+ EventHeader eventHeader = event.getEventHeader();
if (eventHeader == null) {
// Should I skip processing the request and let it continue
// or fail the request and cause client impact
@@ -156,10 +154,10 @@ public class ValidationService {
if (isDelete(eventHeader)) {
continue;
}
- String entityType = eventHeader.getValue(ENTITY_TYPE);
+ String entityType = eventHeader.getEntityType();
if (this.shouldValidate(entityType)) {
- List<String> violations = preValidate(event.getNotificationEvent());
+ List<String> violations = preValidate(event);
if (!violations.isEmpty()) {
AAIException aaiException = new AAIException("AAI_4019");
aaiException.getTemplateVars().addAll(violations);
@@ -172,8 +170,8 @@ public class ValidationService {
/**
* Determine if event is of type delete
*/
- private boolean isDelete(Introspector eventHeader) {
- String action = eventHeader.getValue(ACTION);
+ private boolean isDelete(EventHeader eventHeader) {
+ String action = eventHeader.getAction();
return DELETE.equalsIgnoreCase(action);
}
@@ -186,15 +184,15 @@ public class ValidationService {
// Get the first notification and if the source of that notification
// is in one of the regexes then we skip sending it to validation
NotificationEvent notification = notificationEvents.get(0);
- Introspector eventHeader = notification.getEventHeader();
+ EventHeader eventHeader = notification.getEventHeader();
if (eventHeader != null) {
- String source = eventHeader.getValue(SOURCE_NAME);
+ String source = eventHeader.getSourceName();
return exclusionList.stream().anyMatch(pattern -> pattern.matcher(source).matches());
}
return false;
}
- public List<String> preValidate(String body) throws AAIException {
+ public List<String> preValidate(NotificationEvent notificationEvent) throws AAIException {
Map<String, String> httpHeaders = new HashMap<>();
httpHeaders.put("X-FromAppId", appName);
httpHeaders.put("X-TransactionID", UUID.randomUUID().toString());
@@ -203,7 +201,8 @@ public class ValidationService {
List<String> violations = new ArrayList<>();
ResponseEntity<String> responseEntity;
try {
- responseEntity = validationRestClient.execute(VALIDATION_ENDPOINT, HttpMethod.POST, httpHeaders, body);
+ String requestBody = mapper.writeValueAsString(notificationEvent);
+ responseEntity = validationRestClient.execute(VALIDATION_ENDPOINT, HttpMethod.POST, httpHeaders, requestBody);
Object responseBody = responseEntity.getBody();
if (isSuccess(responseEntity)) {
LOGGER.debug("Validation Service returned following response status code {} and body {}",
@@ -242,8 +241,8 @@ public class ValidationService {
private Validation getValidation(Object responseBody) {
Validation validation = null;
try {
- validation = gson.fromJson(responseBody.toString(), Validation.class);
- } catch (JsonSyntaxException jsonException) {
+ validation = mapper.readValue(responseBody.toString(), Validation.class);
+ } catch (JsonProcessingException jsonException) {
LOGGER.warn("Unable to convert the response body {}", jsonException.getMessage());
}
return validation;
diff --git a/aai-core/src/main/java/org/onap/aai/rest/db/HttpEntry.java b/aai-core/src/main/java/org/onap/aai/rest/db/HttpEntry.java
index c4f84396..dfebd6e7 100644
--- a/aai-core/src/main/java/org/onap/aai/rest/db/HttpEntry.java
+++ b/aai-core/src/main/java/org/onap/aai/rest/db/HttpEntry.java
@@ -130,7 +130,7 @@ public class HttpEntry {
this.dbEngine = new JanusGraphDBEngine(queryStyle, loader);
getDbEngine().startTransaction();
- this.notification = new UEBNotification(loader, loaderFactory, schemaVersions);
+ this.notification = new UEBNotification(loaderFactory, schemaVersions);
if ("true".equals(AAIConfig.get("aai.notification.depth.all.enabled", "true"))) {
this.notificationDepth = AAIProperties.MAXIMUM_DEPTH;
} else {
diff --git a/aai-core/src/main/java/org/onap/aai/rest/notification/NotificationService.java b/aai-core/src/main/java/org/onap/aai/rest/notification/NotificationService.java
index 9c3dde15..c2770692 100644
--- a/aai-core/src/main/java/org/onap/aai/rest/notification/NotificationService.java
+++ b/aai-core/src/main/java/org/onap/aai/rest/notification/NotificationService.java
@@ -34,6 +34,7 @@ import org.onap.aai.db.props.AAIProperties;
import org.onap.aai.exceptions.AAIException;
import org.onap.aai.introspection.Introspector;
import org.onap.aai.introspection.LoaderFactory;
+import org.onap.aai.kafka.NotificationProducer;
import org.onap.aai.prevalidation.ValidationService;
import org.onap.aai.serialization.db.DBSerializer;
import org.onap.aai.serialization.engines.query.QueryEngine;
@@ -52,6 +53,7 @@ public class NotificationService {
public static final Logger LOGGER = LoggerFactory.getLogger(NotificationService.class);
private final ValidationService validationService;
+ private final NotificationProducer notificationProducer;
private final LoaderFactory loaderFactory;
private final boolean isDeltaEventsEnabled;
private final String basePath;
@@ -60,11 +62,13 @@ public class NotificationService {
@Nullable ValidationService validationService,
LoaderFactory loaderFactory,
@Value("${schema.uri.base.path}") String basePath,
- @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled) {
+ @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled,
+ NotificationProducer notificationProducer) {
this.validationService = validationService;
this.loaderFactory = loaderFactory;
this.basePath = basePath;
this.isDeltaEventsEnabled = isDeltaEventsEnabled;
+ this.notificationProducer = notificationProducer;
}
/**
@@ -99,7 +103,7 @@ public class NotificationService {
validationService.validate(notification.getEvents());
}
- notification.triggerEvents();
+ notificationProducer.sendUEBNotification(notification);
if (isDeltaEventsEnabled) {
try {
DeltaEvents deltaEvents = new DeltaEvents(transactionId, sourceOfTruth, schemaVersion.toString(),
diff --git a/aai-core/src/main/java/org/onap/aai/rest/notification/UEBNotification.java b/aai-core/src/main/java/org/onap/aai/rest/notification/UEBNotification.java
index a113cf14..d5803f9c 100644
--- a/aai-core/src/main/java/org/onap/aai/rest/notification/UEBNotification.java
+++ b/aai-core/src/main/java/org/onap/aai/rest/notification/UEBNotification.java
@@ -4,6 +4,8 @@
* ================================================================================
* Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
+ * Modifications Copyright © 2024 Deutsche Telekom.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -30,6 +32,7 @@ import java.util.Map;
import javax.ws.rs.core.Response.Status;
+import org.onap.aai.domain.notificationEvent.NotificationEvent.EventHeader;
import org.onap.aai.exceptions.AAIException;
import org.onap.aai.introspection.Introspector;
import org.onap.aai.introspection.Loader;
@@ -41,59 +44,31 @@ import org.onap.aai.logging.LogFormatTools;
import org.onap.aai.parsers.uri.URIToObject;
import org.onap.aai.setup.SchemaVersion;
import org.onap.aai.setup.SchemaVersions;
+import org.onap.aai.util.AAIConfig;
+import org.onap.aai.util.AAIConstants;
+import org.onap.aai.util.FormatDate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * The Class UEBNotification.
- */
public class UEBNotification {
private static final Logger LOGGER = LoggerFactory.getLogger(UEBNotification.class);
+ private static final FormatDate FORMAT_DATE = new FormatDate("YYYYMMdd-HH:mm:ss:SSS");
+ private static final String EVENT_TYPE = "AAI-EVENT";
- private Loader currentVersionLoader = null;
- protected Map<String, NotificationEvent> events = null;
- private SchemaVersion notificationVersion = null;
-
- /**
- * Instantiates a new UEB notification.
- *
- * @param loader the loader
- */
- public UEBNotification(Loader loader, LoaderFactory loaderFactory, SchemaVersions schemaVersions) {
- events = new LinkedHashMap<>();
- SchemaVersion defaultVersion = schemaVersions.getDefaultVersion();
- currentVersionLoader = loaderFactory.createLoaderForVersion(loader.getModelType(), defaultVersion);
- notificationVersion = defaultVersion;
- }
+ private final String domain = AAIConfig.get("aai.notificationEvent.default.domain", "UNK");
+ private final String sequenceNumber = AAIConfig.get("aai.notificationEvent.default.sequenceNumber", "UNK");
+ private final String severity = AAIConfig.get("aai.notificationEvent.default.severity", "UNK");
+ private final Map<String, org.onap.aai.domain.notificationEvent.NotificationEvent> events;
+ private final Loader currentVersionLoader;
+ private final SchemaVersion notificationVersion;
- /**
- * Instantiates a new UEB notification.
- *
- * @param modelType - Model type
- * @param loaderFactory - the loader factory
- * @param schemaVersions the schema versions bean
- */
- public UEBNotification(ModelType modelType, LoaderFactory loaderFactory, SchemaVersions schemaVersions) {
+ public UEBNotification(LoaderFactory loaderFactory, SchemaVersions schemaVersions) {
events = new LinkedHashMap<>();
- SchemaVersion defaultVersion = schemaVersions.getDefaultVersion();
- currentVersionLoader = loaderFactory.createLoaderForVersion(modelType, defaultVersion);
- notificationVersion = defaultVersion;
+ notificationVersion = schemaVersions.getDefaultVersion();
+ currentVersionLoader = loaderFactory.createLoaderForVersion(ModelType.MOXY, notificationVersion);
}
- /**
- * Creates the notification event.
- *
- * @param transactionId the X-TransactionId
- * @param sourceOfTruth
- * @param status the status
- * @param uri the uri
- * @param obj the obj
- * @param basePath base URI path
- * @throws AAIException the AAI exception
- * @throws IllegalArgumentException the illegal argument exception
- * @throws UnsupportedEncodingException the unsupported encoding exception
- */
public void createNotificationEvent(String transactionId, String sourceOfTruth, Status status, URI uri,
Introspector obj, HashMap<String, Introspector> relatedObjects, String basePath)
throws AAIException, UnsupportedEncodingException {
@@ -102,25 +77,32 @@ public class UEBNotification {
try {
EntityConverter entityConverter = new EntityConverter(new URIToObject(currentVersionLoader, uri, relatedObjects));
- Introspector eventHeader = currentVersionLoader.introspectorFromName("notification-event-header");
+ EventHeader eventHeader = new EventHeader();
basePath = formatBasePath(basePath);
-
String entityLink = formatEntityLink(uri, basePath);
-
- eventHeader.setValue("entity-link", entityLink);
- eventHeader.setValue("action", action);
- eventHeader.setValue("entity-type", obj.getDbName());
- eventHeader.setValue("top-entity-type", entityConverter.getTopEntityName());
- eventHeader.setValue("source-name", sourceOfTruth);
- eventHeader.setValue("version", notificationVersion.toString());
- eventHeader.setValue("id", transactionId);
-
-
- Introspector eventObject = entityConverter.convert(obj);
-
- final NotificationEvent event =
- new NotificationEvent(currentVersionLoader, eventHeader, eventObject, transactionId, sourceOfTruth);
+ eventHeader.setEntityLink(entityLink);
+ eventHeader.setAction(action);
+ eventHeader.setEntityType(obj.getDbName());
+ eventHeader.setTopEntityType(entityConverter.getTopEntityName());
+ eventHeader.setSourceName(sourceOfTruth);
+ eventHeader.setVersion(notificationVersion.toString());
+ eventHeader.setId(transactionId);
+
+ // default values
+ eventHeader.setTimestamp(FORMAT_DATE.getDateTime());
+ eventHeader.setEventType(EVENT_TYPE);
+ eventHeader.setDomain(domain);
+ eventHeader.setSequenceNumber(sequenceNumber);
+ eventHeader.setSeverity(severity);
+
+ Introspector entity = entityConverter.convert(obj);
+
+ final org.onap.aai.domain.notificationEvent.NotificationEvent event =
+ new org.onap.aai.domain.notificationEvent.NotificationEvent();
+ event.setEventHeader(eventHeader);
+ event.setCambriaPartition(AAIConstants.UEB_PUB_PARTITION_AAI);
+ event.setEntity(entity);
events.put(uri.toString(), event);
} catch (AAIUnknownObjectException e) {
throw new RuntimeException("Fatal error - notification-event-header object not found!");
@@ -172,26 +154,10 @@ public class UEBNotification {
return action;
}
- /**
- * Trigger events.
- *
- * @throws AAIException the AAI exception
- */
- public void triggerEvents() throws AAIException {
- for (NotificationEvent event : events.values()) {
- event.trigger();
- }
- clearEvents();
- }
-
- public List<NotificationEvent> getEvents() {
+ public List<org.onap.aai.domain.notificationEvent.NotificationEvent> getEvents() {
return new ArrayList<>(this.events.values());
}
- public Map<String, NotificationEvent> getEventsMap() {
- return this.events;
- }
-
private String getUri(String uri, String basePath) {
if (uri == null || uri.isEmpty()) {
return "";
diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
index c1e8357d..d201134f 100644
--- a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
+++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
@@ -36,6 +36,7 @@ import org.onap.aai.introspection.LoaderFactory;
import org.onap.aai.kafka.AAIKafkaEventJMSConsumer;
import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
import org.onap.aai.kafka.MessageProducer;
+import org.onap.aai.kafka.NotificationProducer;
import org.onap.aai.rest.notification.NotificationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,8 +148,8 @@ public class KafkaConfig {
} else {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
@@ -179,7 +180,7 @@ public class KafkaConfig {
@ConditionalOnMissingBean
public NotificationService notificationService(LoaderFactory loaderFactory,
@Value("${schema.uri.base.path}") String basePath,
- @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled) {
- return new NotificationService(null, loaderFactory, basePath, isDeltaEventsEnabled);
+ @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled, NotificationProducer notificationProducer) {
+ return new NotificationService(null, loaderFactory, basePath, isDeltaEventsEnabled, notificationProducer);
}
}
diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaNotificationEventConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaNotificationEventConfig.java
new file mode 100644
index 00000000..09d8b26c
--- /dev/null
+++ b/aai-core/src/main/java/org/onap/aai/web/KafkaNotificationEventConfig.java
@@ -0,0 +1,111 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.aai.web;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.onap.aai.domain.notificationEvent.NotificationEvent;
+import org.onap.aai.kafka.NotificationProducer;
+import org.onap.aai.kafka.NotificationProducerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+@Configuration
+public class KafkaNotificationEventConfig {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaNotificationEventConfig.class);
+
+ @Value("${jms.bind.address}")
+ private String bindAddress;
+
+ @Value("${spring.kafka.producer.bootstrap-servers}")
+ private String bootstrapServers;
+
+ @Value("${spring.kafka.producer.properties.security.protocol}")
+ private String securityProtocol;
+
+ @Value("${spring.kafka.producer.properties.sasl.mechanism}")
+ private String saslMechanism;
+
+ @Value("${spring.kafka.producer.properties.sasl.jaas.config:#{null}}")
+ private String saslJaasConfig;
+
+ @Value("${spring.kafka.producer.retries:3}")
+ private String retries;
+
+ private Map<String, Object> buildKafkaProperties() throws Exception {
+ Map<String, Object> props = new HashMap<>();
+ if (bootstrapServers == null) {
+ logger.error("Environment Variable " + bootstrapServers + " is missing");
+ throw new Exception("Environment Variable " + bootstrapServers + " is missing");
+ } else {
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ }
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.RETRIES_CONFIG, retries);
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+
+ if (saslJaasConfig == null) {
+ logger.info("Not using any authentication for kafka interaction");
+ } else {
+ logger.info("Using authentication provided by kafka interaction");
+ // Strimzi Kafka security properties
+ props.put("security.protocol", securityProtocol);
+ props.put("sasl.mechanism", saslMechanism);
+ props.put("sasl.jaas.config", saslJaasConfig);
+ }
+ return props;
+ }
+
+ @Bean
+ public ProducerFactory<String, NotificationEvent> notificationEventProducerFactory() throws Exception {
+ Map<String, Object> props = buildKafkaProperties();
+
+ return new DefaultKafkaProducerFactory<>(props);
+ }
+
+ @Bean
+ public KafkaTemplate<String, NotificationEvent> kafkaNotificationEventTemplate(ProducerFactory<String, NotificationEvent> producerFactory) throws Exception {
+ try {
+
+ return new KafkaTemplate<>(producerFactory);
+ } catch (Exception e) {
+ String smth = "";
+ return null;
+ }
+ }
+
+ @Bean
+ public NotificationProducer notificationProducer(KafkaTemplate<String,NotificationEvent> kafkaTemplate) {
+ return new NotificationProducerService(kafkaTemplate);
+ }
+}