From 180f2967c7db5cbbace67616146a66d9d9658af7 Mon Sep 17 00:00:00 2001 From: Fiete Ostkamp Date: Tue, 8 Oct 2024 11:52:14 +0200 Subject: Refactor UEBNotification class - refactor UEBNotification class that it does not use ActiveMQ anymore and directly publishes to Kafka - use .domain.NotificationEvent class - serialize as late as possible Issue-ID: AAI-3931 Signed-off-by: Fiete Ostkamp Change-Id: I3836519752f810f905a9aed96678d497783a2e5d --- .../java/org/onap/aai/config/RestBeanConfig.java | 11 + .../notificationEvent/NotificationEvent.java | 490 +-------------------- .../java/org/onap/aai/kafka/MessageProducer.java | 6 + .../org/onap/aai/kafka/NotificationProducer.java | 31 ++ .../aai/kafka/NotificationProducerService.java | 48 ++ .../onap/aai/prevalidation/ValidationService.java | 47 +- .../main/java/org/onap/aai/rest/db/HttpEntry.java | 2 +- .../aai/rest/notification/NotificationService.java | 8 +- .../aai/rest/notification/UEBNotification.java | 116 ++--- .../main/java/org/onap/aai/web/KafkaConfig.java | 9 +- .../onap/aai/web/KafkaNotificationEventConfig.java | 111 +++++ 11 files changed, 297 insertions(+), 582 deletions(-) create mode 100644 aai-core/src/main/java/org/onap/aai/kafka/NotificationProducer.java create mode 100644 aai-core/src/main/java/org/onap/aai/kafka/NotificationProducerService.java create mode 100644 aai-core/src/main/java/org/onap/aai/web/KafkaNotificationEventConfig.java (limited to 'aai-core/src/main/java') diff --git a/aai-core/src/main/java/org/onap/aai/config/RestBeanConfig.java b/aai-core/src/main/java/org/onap/aai/config/RestBeanConfig.java index c04e4e3c..3cc1719c 100644 --- a/aai-core/src/main/java/org/onap/aai/config/RestBeanConfig.java +++ b/aai-core/src/main/java/org/onap/aai/config/RestBeanConfig.java @@ -32,8 +32,19 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import org.springframework.web.context.annotation.RequestScope; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationModule; + @Configuration public class RestBeanConfig { + + @Bean + public ObjectMapper objectMapper() { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JaxbAnnotationModule()); + return objectMapper; + } + @Bean(name = "traversalUriHttpEntry") @Scope(scopeName = ConfigurableBeanFactory.SCOPE_PROTOTYPE) public HttpEntry traversalUriHttpEntry() { diff --git a/aai-core/src/main/java/org/onap/aai/domain/notificationEvent/NotificationEvent.java b/aai-core/src/main/java/org/onap/aai/domain/notificationEvent/NotificationEvent.java index dad35669..9117e998 100644 --- a/aai-core/src/main/java/org/onap/aai/domain/notificationEvent/NotificationEvent.java +++ b/aai-core/src/main/java/org/onap/aai/domain/notificationEvent/NotificationEvent.java @@ -17,188 +17,35 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -// -// This file was generated by the JavaTM Architecture for XML Binding(JAXB) Reference Implementation, v2.2.4-2 -// See http://java.sun.com/xml/jaxb -// Any modifications to this file will be lost upon recompilation of the source schema. -// Generated on: 2016.01.06 at 05:38:00 PM EST -// - package org.onap.aai.domain.notificationEvent; import javax.xml.bind.annotation.*; -import org.w3c.dom.Element; +import com.fasterxml.jackson.annotation.JsonProperty; -/** - *

- * Java class for anonymous complex type. - * - *

- * The following schema fragment specifies the expected content contained within this class. - * - *

- * <complexType>
- *   <complexContent>
- *     <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
- *       <sequence>
- *         <element name="cambria.partition" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *         <element name="event-header" minOccurs="0">
- *           <complexType>
- *             <complexContent>
- *               <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
- *                 <sequence>
- *                   <element name="id" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="timestamp" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="source-name" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="domain" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="sequence-number" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="severity" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="event-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="version" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="action" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="entity-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="top-entity-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="entity-link" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                   <element name="status" type="{http://www.w3.org/2001/XMLSchema}string"/>
- *                 </sequence>
- *               </restriction>
- *             </complexContent>
- *           </complexType>
- *         </element>
- *         <any processContents='lax' namespace='##other' minOccurs="0"/>
- *       </sequence>
- *     </restriction>
- *   </complexContent>
- * </complexType>
- * 
- * - * - */ +import lombok.Data; + +@Data @XmlAccessorType(XmlAccessType.FIELD) @XmlType(name = "", propOrder = {"cambriaPartition", "eventHeader", "entity"}) @XmlRootElement(name = "NotificationEvent") public class NotificationEvent { @XmlElement(name = "cambria.partition") + @JsonProperty("cambria.partition") protected String cambriaPartition; @XmlElement(name = "event-header") + @JsonProperty("event-header") protected EventHeader eventHeader; @XmlAnyElement(lax = true) protected Object entity; - /** - * Gets the value of the eventHeader property. - * - * @return - * possible object is - * {@link EventHeader } - * - */ - public EventHeader getEventHeader() { - return eventHeader; - } - - /** - * Sets the value of the eventHeader property. - * - * @param value - * allowed object is - * {@link EventHeader } - * - */ - public void setEventHeader(EventHeader value) { - this.eventHeader = value; - } - - /** - * Gets the value of the any property. - * - * @return - * possible object is - * {@link Object } - * {@link Element } - * - */ - public Object getEntity() { - return entity; - } - - /** - * Sets the value of the any property. - * - * @param value - * allowed object is - * {@link Object } - * {@link Element } - * - */ - public void setEntity(Object value) { - this.entity = value; - } - - /** - * Gets the value of the cambriaPartition property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getCambriaPartition() { - return cambriaPartition; - } - - /** - * Sets the value of the cambriaPartition property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setCambriaPartition(String value) { - this.cambriaPartition = value; - } - - /** - *

- * Java class for anonymous complex type. - * - *

- * The following schema fragment specifies the expected content contained within this class. - * - *

-     * <complexType>
-     *   <complexContent>
-     *     <restriction base="{http://www.w3.org/2001/XMLSchema}anyType">
-     *       <sequence>
-     *         <element name="id" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="timestamp" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="source-name" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="domain" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="sequence-number" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="severity" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="event-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="version" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="action" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="entity-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="top-entity-type" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="entity-link" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *         <element name="status" type="{http://www.w3.org/2001/XMLSchema}string"/>
-     *       </sequence>
-     *     </restriction>
-     *   </complexContent>
-     * </complexType>
-     * 
- * - * - */ + @Data @XmlAccessorType(XmlAccessType.FIELD) @XmlType( name = "", propOrder = {"id", "timestamp", "sourceName", "domain", "sequenceNumber", "severity", "eventType", - "version", "action", "entityType", "topEntityType", "entityLink", "status"}) + "version", "action", "entityType", "topEntityType", "entityLink"}) public static class EventHeader { @XmlElement(required = true) @@ -206,340 +53,31 @@ public class NotificationEvent { @XmlElement(required = true) protected String timestamp; @XmlElement(name = "source-name", required = true) + @JsonProperty("source-name") protected String sourceName; @XmlElement(required = true) protected String domain; @XmlElement(name = "sequence-number", required = true) + @JsonProperty("sequence-number") protected String sequenceNumber; @XmlElement(required = true) protected String severity; @XmlElement(name = "event-type", required = true) + @JsonProperty("event-type") protected String eventType; @XmlElement(required = true) protected String version; @XmlElement(required = true) protected String action; @XmlElement(name = "entity-type", required = true) + @JsonProperty("entity-type") protected String entityType; @XmlElement(name = "top-entity-type", required = true) + @JsonProperty("top-entity-type") protected String topEntityType; @XmlElement(name = "entity-link", required = true) + @JsonProperty("entity-link") protected String entityLink; - @XmlElement(required = true) - protected String status; - - /** - * Gets the value of the id property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getId() { - return id; - } - - /** - * Sets the value of the id property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setId(String value) { - this.id = value; - } - - /** - * Gets the value of the timestamp property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getTimestamp() { - return timestamp; - } - - /** - * Sets the value of the timestamp property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setTimestamp(String value) { - this.timestamp = value; - } - - /** - * Gets the value of the sourceName property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getSourceName() { - return sourceName; - } - - /** - * Sets the value of the sourceName property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setSourceName(String value) { - this.sourceName = value; - } - - /** - * Gets the value of the domain property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getDomain() { - return domain; - } - - /** - * Sets the value of the domain property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setDomain(String value) { - this.domain = value; - } - - /** - * Gets the value of the sequenceNumber property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getSequenceNumber() { - return sequenceNumber; - } - - /** - * Sets the value of the sequenceNumber property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setSequenceNumber(String value) { - this.sequenceNumber = value; - } - - /** - * Gets the value of the severity property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getSeverity() { - return severity; - } - - /** - * Sets the value of the severity property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setSeverity(String value) { - this.severity = value; - } - - /** - * Gets the value of the eventType property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getEventType() { - return eventType; - } - - /** - * Sets the value of the eventType property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setEventType(String value) { - this.eventType = value; - } - - /** - * Gets the value of the version property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getVersion() { - return version; - } - - /** - * Sets the value of the version property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setVersion(String value) { - this.version = value; - } - - /** - * Gets the value of the action property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getAction() { - return action; - } - - /** - * Sets the value of the action property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setAction(String value) { - this.action = value; - } - - /** - * Gets the value of the entityType property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getEntityType() { - return entityType; - } - - /** - * Sets the value of the entityType property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setEntityType(String value) { - this.entityType = value; - } - - /** - * Gets the value of the topEntityType property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getTopEntityType() { - return topEntityType; - } - - /** - * Sets the value of the topEntityType property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setTopEntityType(String value) { - this.topEntityType = value; - } - - /** - * Gets the value of the entityLink property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getEntityLink() { - return entityLink; - } - - /** - * Sets the value of the entityLink property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setEntityLink(String value) { - this.entityLink = value; - } - - /** - * Gets the value of the status property. - * - * @return - * possible object is - * {@link String } - * - */ - public String getStatus() { - return status; - } - - /** - * Sets the value of the status property. - * - * @param value - * allowed object is - * {@link String } - * - */ - public void setStatus(String value) { - this.status = value; - } - } } diff --git a/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java index d6a491ef..09fc68a2 100644 --- a/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java +++ b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java @@ -22,6 +22,12 @@ package org.onap.aai.kafka; import org.json.JSONObject; +/** + * MessageProducer interface based on untyped messages + * + * @deprecated use {@link org.onap.aai.kafka.NotificationProducer} instead + */ +@Deprecated public interface MessageProducer { void sendMessageToDefaultDestination(JSONObject finalJson); diff --git a/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducer.java new file mode 100644 index 00000000..3c739174 --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducer.java @@ -0,0 +1,31 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2024 Deutsche Telekom. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.kafka; + +import org.onap.aai.domain.notificationEvent.NotificationEvent; +import org.onap.aai.rest.notification.UEBNotification; +import org.springframework.stereotype.Service; + +@Service +public interface NotificationProducer { + public void sendNotification(NotificationEvent notificationEvent); + public void sendUEBNotification(UEBNotification uebNotification); +} diff --git a/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducerService.java b/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducerService.java new file mode 100644 index 00000000..44a03ba1 --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/kafka/NotificationProducerService.java @@ -0,0 +1,48 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2024 Deutsche Telekom. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.kafka; + +import org.onap.aai.domain.notificationEvent.NotificationEvent; +import org.onap.aai.rest.notification.UEBNotification; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor +public class NotificationProducerService implements NotificationProducer { + + private final KafkaTemplate kafkaTemplate; + @Value("${aai.notifications.enabled:true}") boolean notificationsEnabled; + + public void sendNotification(NotificationEvent notificationEvent) { + if(notificationsEnabled) { + kafkaTemplate.send("AAI-EVENT", notificationEvent); + } + } + + public void sendUEBNotification(UEBNotification uebNotification) { + uebNotification.getEvents().stream() + .forEach(this::sendNotification); + } +} diff --git a/aai-core/src/main/java/org/onap/aai/prevalidation/ValidationService.java b/aai-core/src/main/java/org/onap/aai/prevalidation/ValidationService.java index 093062a9..9d8f5fdb 100644 --- a/aai-core/src/main/java/org/onap/aai/prevalidation/ValidationService.java +++ b/aai-core/src/main/java/org/onap/aai/prevalidation/ValidationService.java @@ -22,8 +22,8 @@ package org.onap.aai.prevalidation; -import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import java.net.ConnectException; import java.net.SocketTimeoutException; @@ -41,9 +41,10 @@ import java.util.stream.Collectors; import javax.annotation.PostConstruct; import org.apache.http.conn.ConnectTimeoutException; +import org.onap.aai.domain.notificationEvent.NotificationEvent; +import org.onap.aai.domain.notificationEvent.NotificationEvent.EventHeader; import org.onap.aai.exceptions.AAIException; -import org.onap.aai.introspection.Introspector; -import org.onap.aai.rest.notification.NotificationEvent; + import org.onap.aai.restclient.RestClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,22 +75,19 @@ public class ValidationService { static final String VALIDATION_HEALTH_ENDPOINT = "/v1/info"; private static final Logger LOGGER = LoggerFactory.getLogger(ValidationService.class); - private static final String ENTITY_TYPE = "entity-type"; - private static final String ACTION = "action"; - private static final String SOURCE_NAME = "source-name"; private static final String DELETE = "DELETE"; private final RestClient validationRestClient; private final String appName; private final Set validationNodeTypes; - private final Gson gson; - - private List exclusionList; + private final ObjectMapper mapper; + private final List exclusionList; public ValidationService(@Qualifier("validationRestClient") RestClient validationRestClient, @Value("${spring.application.name}") String appName, @Value("${validation.service.node-types}") String validationNodes, - @Value("${validation.service.exclusion-regexes:#{null}}") String exclusionRegexes) { + @Value("${validation.service.exclusion-regexes:#{null}}") String exclusionRegexes, + ObjectMapper mapper) { this.validationRestClient = validationRestClient; this.appName = appName; @@ -101,7 +99,7 @@ public class ValidationService { this.exclusionList = Arrays.stream(exclusionRegexes.split(",")).map(Pattern::compile).collect(Collectors.toList()); } - this.gson = new Gson(); + this.mapper = mapper; LOGGER.info("Successfully initialized the pre validation service"); } @@ -119,7 +117,7 @@ public class ValidationService { ResponseEntity healthCheckResponse = null; try { healthCheckResponse = - validationRestClient.execute(VALIDATION_HEALTH_ENDPOINT, HttpMethod.GET, httpHeaders, null); + validationRestClient.execute(VALIDATION_HEALTH_ENDPOINT, HttpMethod.GET, httpHeaders); } catch (Exception ex) { AAIException validationException = new AAIException("AAI_4021", ex); throw validationException; @@ -142,7 +140,7 @@ public class ValidationService { } for (NotificationEvent event : notificationEvents) { - Introspector eventHeader = event.getEventHeader(); + EventHeader eventHeader = event.getEventHeader(); if (eventHeader == null) { // Should I skip processing the request and let it continue // or fail the request and cause client impact @@ -156,10 +154,10 @@ public class ValidationService { if (isDelete(eventHeader)) { continue; } - String entityType = eventHeader.getValue(ENTITY_TYPE); + String entityType = eventHeader.getEntityType(); if (this.shouldValidate(entityType)) { - List violations = preValidate(event.getNotificationEvent()); + List violations = preValidate(event); if (!violations.isEmpty()) { AAIException aaiException = new AAIException("AAI_4019"); aaiException.getTemplateVars().addAll(violations); @@ -172,8 +170,8 @@ public class ValidationService { /** * Determine if event is of type delete */ - private boolean isDelete(Introspector eventHeader) { - String action = eventHeader.getValue(ACTION); + private boolean isDelete(EventHeader eventHeader) { + String action = eventHeader.getAction(); return DELETE.equalsIgnoreCase(action); } @@ -186,15 +184,15 @@ public class ValidationService { // Get the first notification and if the source of that notification // is in one of the regexes then we skip sending it to validation NotificationEvent notification = notificationEvents.get(0); - Introspector eventHeader = notification.getEventHeader(); + EventHeader eventHeader = notification.getEventHeader(); if (eventHeader != null) { - String source = eventHeader.getValue(SOURCE_NAME); + String source = eventHeader.getSourceName(); return exclusionList.stream().anyMatch(pattern -> pattern.matcher(source).matches()); } return false; } - public List preValidate(String body) throws AAIException { + public List preValidate(NotificationEvent notificationEvent) throws AAIException { Map httpHeaders = new HashMap<>(); httpHeaders.put("X-FromAppId", appName); httpHeaders.put("X-TransactionID", UUID.randomUUID().toString()); @@ -203,7 +201,8 @@ public class ValidationService { List violations = new ArrayList<>(); ResponseEntity responseEntity; try { - responseEntity = validationRestClient.execute(VALIDATION_ENDPOINT, HttpMethod.POST, httpHeaders, body); + String requestBody = mapper.writeValueAsString(notificationEvent); + responseEntity = validationRestClient.execute(VALIDATION_ENDPOINT, HttpMethod.POST, httpHeaders, requestBody); Object responseBody = responseEntity.getBody(); if (isSuccess(responseEntity)) { LOGGER.debug("Validation Service returned following response status code {} and body {}", @@ -242,8 +241,8 @@ public class ValidationService { private Validation getValidation(Object responseBody) { Validation validation = null; try { - validation = gson.fromJson(responseBody.toString(), Validation.class); - } catch (JsonSyntaxException jsonException) { + validation = mapper.readValue(responseBody.toString(), Validation.class); + } catch (JsonProcessingException jsonException) { LOGGER.warn("Unable to convert the response body {}", jsonException.getMessage()); } return validation; diff --git a/aai-core/src/main/java/org/onap/aai/rest/db/HttpEntry.java b/aai-core/src/main/java/org/onap/aai/rest/db/HttpEntry.java index c4f84396..dfebd6e7 100644 --- a/aai-core/src/main/java/org/onap/aai/rest/db/HttpEntry.java +++ b/aai-core/src/main/java/org/onap/aai/rest/db/HttpEntry.java @@ -130,7 +130,7 @@ public class HttpEntry { this.dbEngine = new JanusGraphDBEngine(queryStyle, loader); getDbEngine().startTransaction(); - this.notification = new UEBNotification(loader, loaderFactory, schemaVersions); + this.notification = new UEBNotification(loaderFactory, schemaVersions); if ("true".equals(AAIConfig.get("aai.notification.depth.all.enabled", "true"))) { this.notificationDepth = AAIProperties.MAXIMUM_DEPTH; } else { diff --git a/aai-core/src/main/java/org/onap/aai/rest/notification/NotificationService.java b/aai-core/src/main/java/org/onap/aai/rest/notification/NotificationService.java index 9c3dde15..c2770692 100644 --- a/aai-core/src/main/java/org/onap/aai/rest/notification/NotificationService.java +++ b/aai-core/src/main/java/org/onap/aai/rest/notification/NotificationService.java @@ -34,6 +34,7 @@ import org.onap.aai.db.props.AAIProperties; import org.onap.aai.exceptions.AAIException; import org.onap.aai.introspection.Introspector; import org.onap.aai.introspection.LoaderFactory; +import org.onap.aai.kafka.NotificationProducer; import org.onap.aai.prevalidation.ValidationService; import org.onap.aai.serialization.db.DBSerializer; import org.onap.aai.serialization.engines.query.QueryEngine; @@ -52,6 +53,7 @@ public class NotificationService { public static final Logger LOGGER = LoggerFactory.getLogger(NotificationService.class); private final ValidationService validationService; + private final NotificationProducer notificationProducer; private final LoaderFactory loaderFactory; private final boolean isDeltaEventsEnabled; private final String basePath; @@ -60,11 +62,13 @@ public class NotificationService { @Nullable ValidationService validationService, LoaderFactory loaderFactory, @Value("${schema.uri.base.path}") String basePath, - @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled) { + @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled, + NotificationProducer notificationProducer) { this.validationService = validationService; this.loaderFactory = loaderFactory; this.basePath = basePath; this.isDeltaEventsEnabled = isDeltaEventsEnabled; + this.notificationProducer = notificationProducer; } /** @@ -99,7 +103,7 @@ public class NotificationService { validationService.validate(notification.getEvents()); } - notification.triggerEvents(); + notificationProducer.sendUEBNotification(notification); if (isDeltaEventsEnabled) { try { DeltaEvents deltaEvents = new DeltaEvents(transactionId, sourceOfTruth, schemaVersion.toString(), diff --git a/aai-core/src/main/java/org/onap/aai/rest/notification/UEBNotification.java b/aai-core/src/main/java/org/onap/aai/rest/notification/UEBNotification.java index a113cf14..d5803f9c 100644 --- a/aai-core/src/main/java/org/onap/aai/rest/notification/UEBNotification.java +++ b/aai-core/src/main/java/org/onap/aai/rest/notification/UEBNotification.java @@ -4,6 +4,8 @@ * ================================================================================ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Modifications Copyright © 2024 Deutsche Telekom. + * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -30,6 +32,7 @@ import java.util.Map; import javax.ws.rs.core.Response.Status; +import org.onap.aai.domain.notificationEvent.NotificationEvent.EventHeader; import org.onap.aai.exceptions.AAIException; import org.onap.aai.introspection.Introspector; import org.onap.aai.introspection.Loader; @@ -41,59 +44,31 @@ import org.onap.aai.logging.LogFormatTools; import org.onap.aai.parsers.uri.URIToObject; import org.onap.aai.setup.SchemaVersion; import org.onap.aai.setup.SchemaVersions; +import org.onap.aai.util.AAIConfig; +import org.onap.aai.util.AAIConstants; +import org.onap.aai.util.FormatDate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * The Class UEBNotification. - */ public class UEBNotification { private static final Logger LOGGER = LoggerFactory.getLogger(UEBNotification.class); + private static final FormatDate FORMAT_DATE = new FormatDate("YYYYMMdd-HH:mm:ss:SSS"); + private static final String EVENT_TYPE = "AAI-EVENT"; - private Loader currentVersionLoader = null; - protected Map events = null; - private SchemaVersion notificationVersion = null; - - /** - * Instantiates a new UEB notification. - * - * @param loader the loader - */ - public UEBNotification(Loader loader, LoaderFactory loaderFactory, SchemaVersions schemaVersions) { - events = new LinkedHashMap<>(); - SchemaVersion defaultVersion = schemaVersions.getDefaultVersion(); - currentVersionLoader = loaderFactory.createLoaderForVersion(loader.getModelType(), defaultVersion); - notificationVersion = defaultVersion; - } + private final String domain = AAIConfig.get("aai.notificationEvent.default.domain", "UNK"); + private final String sequenceNumber = AAIConfig.get("aai.notificationEvent.default.sequenceNumber", "UNK"); + private final String severity = AAIConfig.get("aai.notificationEvent.default.severity", "UNK"); + private final Map events; + private final Loader currentVersionLoader; + private final SchemaVersion notificationVersion; - /** - * Instantiates a new UEB notification. - * - * @param modelType - Model type - * @param loaderFactory - the loader factory - * @param schemaVersions the schema versions bean - */ - public UEBNotification(ModelType modelType, LoaderFactory loaderFactory, SchemaVersions schemaVersions) { + public UEBNotification(LoaderFactory loaderFactory, SchemaVersions schemaVersions) { events = new LinkedHashMap<>(); - SchemaVersion defaultVersion = schemaVersions.getDefaultVersion(); - currentVersionLoader = loaderFactory.createLoaderForVersion(modelType, defaultVersion); - notificationVersion = defaultVersion; + notificationVersion = schemaVersions.getDefaultVersion(); + currentVersionLoader = loaderFactory.createLoaderForVersion(ModelType.MOXY, notificationVersion); } - /** - * Creates the notification event. - * - * @param transactionId the X-TransactionId - * @param sourceOfTruth - * @param status the status - * @param uri the uri - * @param obj the obj - * @param basePath base URI path - * @throws AAIException the AAI exception - * @throws IllegalArgumentException the illegal argument exception - * @throws UnsupportedEncodingException the unsupported encoding exception - */ public void createNotificationEvent(String transactionId, String sourceOfTruth, Status status, URI uri, Introspector obj, HashMap relatedObjects, String basePath) throws AAIException, UnsupportedEncodingException { @@ -102,25 +77,32 @@ public class UEBNotification { try { EntityConverter entityConverter = new EntityConverter(new URIToObject(currentVersionLoader, uri, relatedObjects)); - Introspector eventHeader = currentVersionLoader.introspectorFromName("notification-event-header"); + EventHeader eventHeader = new EventHeader(); basePath = formatBasePath(basePath); - String entityLink = formatEntityLink(uri, basePath); - - eventHeader.setValue("entity-link", entityLink); - eventHeader.setValue("action", action); - eventHeader.setValue("entity-type", obj.getDbName()); - eventHeader.setValue("top-entity-type", entityConverter.getTopEntityName()); - eventHeader.setValue("source-name", sourceOfTruth); - eventHeader.setValue("version", notificationVersion.toString()); - eventHeader.setValue("id", transactionId); - - - Introspector eventObject = entityConverter.convert(obj); - - final NotificationEvent event = - new NotificationEvent(currentVersionLoader, eventHeader, eventObject, transactionId, sourceOfTruth); + eventHeader.setEntityLink(entityLink); + eventHeader.setAction(action); + eventHeader.setEntityType(obj.getDbName()); + eventHeader.setTopEntityType(entityConverter.getTopEntityName()); + eventHeader.setSourceName(sourceOfTruth); + eventHeader.setVersion(notificationVersion.toString()); + eventHeader.setId(transactionId); + + // default values + eventHeader.setTimestamp(FORMAT_DATE.getDateTime()); + eventHeader.setEventType(EVENT_TYPE); + eventHeader.setDomain(domain); + eventHeader.setSequenceNumber(sequenceNumber); + eventHeader.setSeverity(severity); + + Introspector entity = entityConverter.convert(obj); + + final org.onap.aai.domain.notificationEvent.NotificationEvent event = + new org.onap.aai.domain.notificationEvent.NotificationEvent(); + event.setEventHeader(eventHeader); + event.setCambriaPartition(AAIConstants.UEB_PUB_PARTITION_AAI); + event.setEntity(entity); events.put(uri.toString(), event); } catch (AAIUnknownObjectException e) { throw new RuntimeException("Fatal error - notification-event-header object not found!"); @@ -172,26 +154,10 @@ public class UEBNotification { return action; } - /** - * Trigger events. - * - * @throws AAIException the AAI exception - */ - public void triggerEvents() throws AAIException { - for (NotificationEvent event : events.values()) { - event.trigger(); - } - clearEvents(); - } - - public List getEvents() { + public List getEvents() { return new ArrayList<>(this.events.values()); } - public Map getEventsMap() { - return this.events; - } - private String getUri(String uri, String basePath) { if (uri == null || uri.isEmpty()) { return ""; diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java index c1e8357d..d201134f 100644 --- a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java +++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java @@ -36,6 +36,7 @@ import org.onap.aai.introspection.LoaderFactory; import org.onap.aai.kafka.AAIKafkaEventJMSConsumer; import org.onap.aai.kafka.AAIKafkaEventJMSProducer; import org.onap.aai.kafka.MessageProducer; +import org.onap.aai.kafka.NotificationProducer; import org.onap.aai.rest.notification.NotificationService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -147,8 +148,8 @@ public class KafkaConfig { } else { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); @@ -179,7 +180,7 @@ public class KafkaConfig { @ConditionalOnMissingBean public NotificationService notificationService(LoaderFactory loaderFactory, @Value("${schema.uri.base.path}") String basePath, - @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled) { - return new NotificationService(null, loaderFactory, basePath, isDeltaEventsEnabled); + @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled, NotificationProducer notificationProducer) { + return new NotificationService(null, loaderFactory, basePath, isDeltaEventsEnabled, notificationProducer); } } diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaNotificationEventConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaNotificationEventConfig.java new file mode 100644 index 00000000..09d8b26c --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/web/KafkaNotificationEventConfig.java @@ -0,0 +1,111 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2024 Deutsche Telekom. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.web; + +import java.util.Map; +import java.util.HashMap; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.onap.aai.domain.notificationEvent.NotificationEvent; +import org.onap.aai.kafka.NotificationProducer; +import org.onap.aai.kafka.NotificationProducerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +@Configuration +public class KafkaNotificationEventConfig { + + private static final Logger logger = LoggerFactory.getLogger(KafkaNotificationEventConfig.class); + + @Value("${jms.bind.address}") + private String bindAddress; + + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.producer.properties.security.protocol}") + private String securityProtocol; + + @Value("${spring.kafka.producer.properties.sasl.mechanism}") + private String saslMechanism; + + @Value("${spring.kafka.producer.properties.sasl.jaas.config:#{null}}") + private String saslJaasConfig; + + @Value("${spring.kafka.producer.retries:3}") + private String retries; + + private Map buildKafkaProperties() throws Exception { + Map props = new HashMap<>(); + if (bootstrapServers == null) { + logger.error("Environment Variable " + bootstrapServers + " is missing"); + throw new Exception("Environment Variable " + bootstrapServers + " is missing"); + } else { + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.RETRIES_CONFIG, retries); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); + + if (saslJaasConfig == null) { + logger.info("Not using any authentication for kafka interaction"); + } else { + logger.info("Using authentication provided by kafka interaction"); + // Strimzi Kafka security properties + props.put("security.protocol", securityProtocol); + props.put("sasl.mechanism", saslMechanism); + props.put("sasl.jaas.config", saslJaasConfig); + } + return props; + } + + @Bean + public ProducerFactory notificationEventProducerFactory() throws Exception { + Map props = buildKafkaProperties(); + + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate kafkaNotificationEventTemplate(ProducerFactory producerFactory) throws Exception { + try { + + return new KafkaTemplate<>(producerFactory); + } catch (Exception e) { + String smth = ""; + return null; + } + } + + @Bean + public NotificationProducer notificationProducer(KafkaTemplate kafkaTemplate) { + return new NotificationProducerService(kafkaTemplate); + } +} -- cgit 1.2.3-korg