From dff80ed76c8e0e6416e0688541f3094db3ca260a Mon Sep 17 00:00:00 2001 From: Singh Date: Tue, 9 Apr 2024 17:38:07 +0530 Subject: Remove DMaaP dependency from AAI-Common - Remove Dmaap dependency in AAI-Common and replace it with Kafka. Issue-ID: AAI-3792 Change-Id: If3fd5c3bdc2448f7e260a26000b02a510c80d7fb Signed-off-by: Singh --- .../org/onap/aai/schema/enums/ObjectMetadata.java | 2 +- aai-core/pom.xml | 21 +++ .../onap/aai/dmaap/AAIDmaapEventJMSConsumer.java | 146 ----------------- .../onap/aai/dmaap/AAIDmaapEventJMSProducer.java | 65 -------- .../java/org/onap/aai/dmaap/MessageProducer.java | 30 ---- .../onap/aai/kafka/AAIKafkaEventJMSConsumer.java | 135 ++++++++++++++++ .../onap/aai/kafka/AAIKafkaEventJMSProducer.java | 65 ++++++++ .../java/org/onap/aai/kafka/MessageProducer.java | 30 ++++ .../org/onap/aai/util/StoreNotificationEvent.java | 16 +- .../java/org/onap/aai/util/delta/DeltaEvents.java | 6 +- .../main/java/org/onap/aai/web/DmaapConfig.java | 125 --------------- .../org/onap/aai/web/EventClientPublisher.java | 95 ----------- .../main/java/org/onap/aai/web/KafkaConfig.java | 175 +++++++++++++++++++++ aai-core/src/main/resources/logback.xml | 26 +-- .../aai/kafka/AAIKafkaEventJMSConsumerTest.java | 89 +++++++++++ .../onap/aai/util/StoreNotificationEventTest.java | 8 +- aai-core/src/test/resources/logback.xml | 26 +-- .../resources/payloads/resource/aai-event.json | 64 ++++++++ .../payloads/resource/aai-invalid-event.json | 64 ++++++++ .../onap/aai/aailog/logs/AaiDmaapMetricLog.java | 2 +- .../onap/logging/filter/base/ONAPComponents.java | 7 +- .../src/test/resources/error.properties | 4 +- aai-parent/pom.xml | 10 +- docs/AAI REST API Documentation/AAIRESTAPI.rst | 4 +- 24 files changed, 694 insertions(+), 521 deletions(-) delete mode 100644 aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java delete mode 100644 aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java delete mode 100644 aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java create mode 100644 aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java create mode 100644 aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java create mode 100644 aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java delete mode 100644 aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java delete mode 100644 aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java create mode 100644 aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java create mode 100644 aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java create mode 100644 aai-core/src/test/resources/payloads/resource/aai-event.json create mode 100644 aai-core/src/test/resources/payloads/resource/aai-invalid-event.json diff --git a/aai-annotations/src/main/java/org/onap/aai/schema/enums/ObjectMetadata.java b/aai-annotations/src/main/java/org/onap/aai/schema/enums/ObjectMetadata.java index 406800fd..3443d32f 100644 --- a/aai-annotations/src/main/java/org/onap/aai/schema/enums/ObjectMetadata.java +++ b/aai-annotations/src/main/java/org/onap/aai/schema/enums/ObjectMetadata.java @@ -28,7 +28,7 @@ public enum ObjectMetadata { DESCRIPTION("description"), /** * names of properties to appear in relationship-lists - * and parent objects in DMaaP messages + * and parent objects in Kafka messages *
* comma separated list */ diff --git a/aai-core/pom.xml b/aai-core/pom.xml index fb646692..e48e8f58 100644 --- a/aai-core/pom.xml +++ b/aai-core/pom.xml @@ -135,6 +135,17 @@ limitations under the License. commons-text compile + + org.junit.vintage + junit-vintage-engine + test + + + org.hamcrest + hamcrest-core + + + com.att.eelf eelf-core @@ -190,6 +201,16 @@ limitations under the License. com.fasterxml.jackson.jaxrs jackson-jaxrs-json-provider + + org.springframework.kafka + spring-kafka + 2.7.14 + + + org.springframework.kafka + spring-kafka-test + test + com.googlecode.json-simple json-simple diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java deleted file mode 100644 index d3addebb..00000000 --- a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Modifications Copyright © 2018 IBM. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.dmaap; - -import java.util.Map; -import java.util.Objects; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.TextMessage; - -import org.json.JSONException; -import org.json.JSONObject; -import org.onap.aai.aailog.logs.AaiDmaapMetricLog; -import org.onap.aai.exceptions.AAIException; -import org.onap.aai.logging.AaiElsErrorCode; -import org.onap.aai.logging.ErrorLogHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import org.springframework.core.env.Environment; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.web.client.RestTemplate; - -public class AAIDmaapEventJMSConsumer implements MessageListener { - - private static final String EVENT_TOPIC = "event-topic"; - - private static final Logger LOGGER = LoggerFactory.getLogger(AAIDmaapEventJMSConsumer.class); - - private RestTemplate restTemplate; - - private HttpHeaders httpHeaders; - - private Environment environment; - private Map mdcCopy; - - public AAIDmaapEventJMSConsumer(Environment environment, RestTemplate restTemplate, HttpHeaders httpHeaders) { - super(); - mdcCopy = MDC.getCopyOfContextMap(); - Objects.nonNull(environment); - Objects.nonNull(restTemplate); - Objects.nonNull(httpHeaders); - this.environment = environment; - this.restTemplate = restTemplate; - this.httpHeaders = httpHeaders; - } - - @Override - public void onMessage(Message message) { - - if (restTemplate == null) { - return; - } - - String jsmMessageTxt = ""; - String aaiEvent = ""; - JSONObject aaiEventHeader; - JSONObject joPayload; - String transactionId = ""; - String serviceName = ""; - String eventName = ""; - String aaiElsErrorCode = AaiElsErrorCode.SUCCESS; - String errorDescription = ""; - - if (mdcCopy != null) { - MDC.setContextMap(mdcCopy); - } - - if (message instanceof TextMessage) { - AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog(); - try { - jsmMessageTxt = ((TextMessage) message).getText(); - JSONObject jo = new JSONObject(jsmMessageTxt); - if (jo.has("aaiEventPayload")) { - joPayload = jo.getJSONObject("aaiEventPayload"); - aaiEvent = joPayload.toString(); - } else { - return; - } - if (jo.getString(EVENT_TOPIC) != null) { - eventName = jo.getString(EVENT_TOPIC); - } - if (joPayload.has("event-header")) { - try { - aaiEventHeader = joPayload.getJSONObject("event-header"); - if (aaiEventHeader.has("id")) { - transactionId = aaiEventHeader.get("id").toString(); - } - if (aaiEventHeader.has("entity-link")) { - serviceName = aaiEventHeader.get("entity-link").toString(); - } - } catch (JSONException jexc) { - // ignore, this is just used for logging - } - } - metricLog.pre(eventName, aaiEvent, transactionId, serviceName); - - HttpEntity httpEntity = new HttpEntity(aaiEvent, httpHeaders); - - String transportType = environment.getProperty("dmaap.ribbon.transportType", "http"); - String baseUrl = transportType + "://" + environment.getProperty("dmaap.ribbon.listOfServers"); - String endpoint = "/events/" + eventName; - - if ("AAI-EVENT".equals(eventName)) { - restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class); - } else { - LOGGER.error(String.format("%s|Event Topic invalid.", eventName)); - } - } catch (JMSException | JSONException e) { - aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR; - errorDescription = e.getMessage(); - ErrorLogHelper.logException(new AAIException("AAI_7350")); - } catch (Exception e) { - aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR; - errorDescription = e.getMessage(); - ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt)); - } finally { - metricLog.post(aaiElsErrorCode, errorDescription); - } - } - } -} diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java deleted file mode 100644 index 3d675efe..00000000 --- a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Modifications Copyright © 2018 IBM. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.dmaap; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQQueue; -import org.json.JSONObject; -import org.onap.aai.util.AAIConfig; -import org.springframework.jms.connection.CachingConnectionFactory; -import org.springframework.jms.core.JmsTemplate; - -public class AAIDmaapEventJMSProducer implements MessageProducer { - - private JmsTemplate jmsTemplate; - - public AAIDmaapEventJMSProducer() { - if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) { - this.jmsTemplate = new JmsTemplate(); - String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547"); - this.jmsTemplate - .setConnectionFactory(new CachingConnectionFactory(new ActiveMQConnectionFactory(activeMqTcpUrl))); - this.jmsTemplate.setDefaultDestination(new ActiveMQQueue("IN_QUEUE")); - } - } - - public void sendMessageToDefaultDestination(JSONObject finalJson) { - if (jmsTemplate != null) { - jmsTemplate.convertAndSend(finalJson.toString()); - CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory(); - if (ccf != null) { - ccf.destroy(); - } - } - } - - public void sendMessageToDefaultDestination(String msg) { - if (jmsTemplate != null) { - jmsTemplate.convertAndSend(msg); - CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory(); - if (ccf != null) { - ccf.destroy(); - } - } - } -} diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java b/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java deleted file mode 100644 index da70d505..00000000 --- a/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.dmaap; - -import org.json.JSONObject; - -public interface MessageProducer { - - void sendMessageToDefaultDestination(JSONObject finalJson); - - void sendMessageToDefaultDestination(String msg); -} diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java new file mode 100644 index 00000000..731f3dfc --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java @@ -0,0 +1,135 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Modifications Copyright © 2018 IBM. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + package org.onap.aai.kafka; + + import java.util.Map; + import java.util.Objects; + + import javax.jms.JMSException; + import javax.jms.Message; + import javax.jms.MessageListener; + import javax.jms.TextMessage; + + import org.json.JSONException; + import org.json.JSONObject; + import org.onap.aai.aailog.logs.AaiDmaapMetricLog; + import org.onap.aai.exceptions.AAIException; + import org.onap.aai.logging.AaiElsErrorCode; + import org.onap.aai.logging.ErrorLogHelper; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.slf4j.MDC; + import org.springframework.core.env.Environment; + import org.springframework.kafka.core.KafkaTemplate; + + public class AAIKafkaEventJMSConsumer implements MessageListener { + + private static final String EVENT_TOPIC = "event-topic"; + + private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class); + + private Environment environment; + private Map mdcCopy; + private KafkaTemplate kafkaTemplate; + + public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate kafkaTemplate) { + super(); + mdcCopy = MDC.getCopyOfContextMap(); + Objects.nonNull(environment); + this.environment = environment; + this.kafkaTemplate=kafkaTemplate; + } + + @Override + public void onMessage(Message message) { + + if (kafkaTemplate == null) { + return; + } + + String jsmMessageTxt = ""; + String aaiEvent = ""; + JSONObject aaiEventHeader; + JSONObject joPayload; + String transactionId = ""; + String serviceName = ""; + String eventName = ""; + String aaiElsErrorCode = AaiElsErrorCode.SUCCESS; + String errorDescription = ""; + + if (mdcCopy != null) { + MDC.setContextMap(mdcCopy); + } + + if (message instanceof TextMessage) { + AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog(); + try { + jsmMessageTxt = ((TextMessage) message).getText(); + JSONObject jo = new JSONObject(jsmMessageTxt); + if (jo.has("aaiEventPayload")) { + joPayload = jo.getJSONObject("aaiEventPayload"); + aaiEvent = joPayload.toString(); + } else { + return; + } + if (jo.getString(EVENT_TOPIC) != null) { + eventName = jo.getString(EVENT_TOPIC); + } + if (joPayload.has("event-header")) { + try { + aaiEventHeader = joPayload.getJSONObject("event-header"); + if (aaiEventHeader.has("id")) { + transactionId = aaiEventHeader.get("id").toString(); + } + if (aaiEventHeader.has("entity-link")) { + serviceName = aaiEventHeader.get("entity-link").toString(); + } + } catch (JSONException jexc) { + // ignore, this is just used for logging + } + } + metricLog.pre(eventName, aaiEvent, transactionId, serviceName); + + + if ("AAI-EVENT".equals(eventName)) { + // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class); + kafkaTemplate.send(eventName,aaiEvent); + + } else { + LOGGER.error(String.format("%s|Event Topic invalid.", eventName)); + } + } catch (JMSException | JSONException e) { + aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR; + errorDescription = e.getMessage(); + ErrorLogHelper.logException(new AAIException("AAI_7350")); + } catch (Exception e) { + aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR; + errorDescription = e.getMessage(); + ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt)); + } finally { + metricLog.post(aaiElsErrorCode, errorDescription); + } + } + } + } + \ No newline at end of file diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java new file mode 100644 index 00000000..00cf677f --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java @@ -0,0 +1,65 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Modifications Copyright © 2018 IBM. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.kafka; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; +import org.json.JSONObject; +import org.onap.aai.util.AAIConfig; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.core.JmsTemplate; + +public class AAIKafkaEventJMSProducer implements MessageProducer { + + private JmsTemplate jmsTemplate; + + public AAIKafkaEventJMSProducer() { + if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) { + this.jmsTemplate = new JmsTemplate(); + String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547"); + this.jmsTemplate + .setConnectionFactory(new CachingConnectionFactory(new ActiveMQConnectionFactory(activeMqTcpUrl))); + this.jmsTemplate.setDefaultDestination(new ActiveMQQueue("IN_QUEUE")); + } + } + + public void sendMessageToDefaultDestination(JSONObject finalJson) { + if (jmsTemplate != null) { + jmsTemplate.convertAndSend(finalJson.toString()); + CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory(); + if (ccf != null) { + ccf.destroy(); + } + } + } + + public void sendMessageToDefaultDestination(String msg) { + if (jmsTemplate != null) { + jmsTemplate.convertAndSend(msg); + CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory(); + if (ccf != null) { + ccf.destroy(); + } + } + } +} diff --git a/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java new file mode 100644 index 00000000..d6a491ef --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java @@ -0,0 +1,30 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.kafka; + +import org.json.JSONObject; + +public interface MessageProducer { + + void sendMessageToDefaultDestination(JSONObject finalJson); + + void sendMessageToDefaultDestination(String msg); +} diff --git a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java index 598ef231..6f3e8883 100644 --- a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java +++ b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java @@ -30,13 +30,13 @@ import org.eclipse.persistence.dynamic.DynamicEntity; import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; import org.json.JSONException; import org.json.JSONObject; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; -import org.onap.aai.dmaap.MessageProducer; import org.onap.aai.domain.notificationEvent.NotificationEvent; import org.onap.aai.exceptions.AAIException; import org.onap.aai.introspection.Introspector; import org.onap.aai.introspection.Loader; import org.onap.aai.introspection.exceptions.AAIUnknownObjectException; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.onap.aai.kafka.MessageProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; @@ -59,12 +59,12 @@ public class StoreNotificationEvent { * Instantiates a new store notification event. */ public StoreNotificationEvent(String transactionId, String sourceOfTruth) { - this.messageProducer = new AAIDmaapEventJMSProducer(); + this.messageProducer = new AAIKafkaEventJMSProducer(); this.transactionId = transactionId; this.sourceOfTruth = sourceOfTruth; } - public StoreNotificationEvent(AAIDmaapEventJMSProducer producer, String transactionId, String sourceOfTruth) { + public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) { this.messageProducer = producer; this.transactionId = transactionId; this.sourceOfTruth = sourceOfTruth; @@ -139,7 +139,7 @@ public class StoreNotificationEvent { try { PojoUtils pu = new PojoUtils(); String entityJson = pu.getJsonFromObject(ne); - sendToDmaapJmsQueue(entityJson); + sendToKafkaJmsQueue(entityJson); return entityJson; } catch (Exception e) { throw new AAIException("AAI_7350", e); @@ -227,7 +227,7 @@ public class StoreNotificationEvent { marshaller.setProperty(org.eclipse.persistence.jaxb.MarshallerProperties.JSON_WRAPPER_AS_ARRAY_NAME, false); marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, false); marshaller.marshal(notificationEvent, result); - this.sendToDmaapJmsQueue(result.toString()); + this.sendToKafkaJmsQueue(result.toString()); } catch (Exception e) { throw new AAIException("AAI_7350", e); @@ -380,7 +380,7 @@ public class StoreNotificationEvent { notificationEvent.setValue("entity", obj.getUnderlyingObject()); String entityJson = notificationEvent.marshal(false); - sendToDmaapJmsQueue(entityJson); + sendToKafkaJmsQueue(entityJson); return entityJson; } catch (JSONException e) { throw new AAIException("AAI_7350", e); @@ -389,7 +389,7 @@ public class StoreNotificationEvent { } } - private void sendToDmaapJmsQueue(String entityString) throws JSONException { + private void sendToKafkaJmsQueue(String entityString) throws JSONException { JSONObject entityJsonObject = new JSONObject(entityString); diff --git a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java index 10d47cd6..6fbc297b 100644 --- a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java +++ b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java @@ -28,8 +28,8 @@ import java.util.Date; import java.util.Map; import org.onap.aai.db.props.AAIProperties; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; -import org.onap.aai.dmaap.MessageProducer; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.onap.aai.kafka.MessageProducer; import org.onap.aai.util.AAIConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +50,7 @@ public class DeltaEvents { private MessageProducer messageProducer; public DeltaEvents(String transId, String sourceName, String schemaVersion, Map objectDeltas) { - this(transId, sourceName, schemaVersion, objectDeltas, new AAIDmaapEventJMSProducer()); + this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer()); } public DeltaEvents(String transId, String sourceName, String schemaVersion, Map objectDeltas, diff --git a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java b/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java deleted file mode 100644 index 078ca963..00000000 --- a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.web; - -import javax.annotation.PostConstruct; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.onap.aai.dmaap.AAIDmaapEventJMSConsumer; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; -import org.springframework.http.HttpHeaders; -import org.springframework.jms.connection.CachingConnectionFactory; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.listener.DefaultMessageListenerContainer; -import org.springframework.web.client.RestTemplate; - -@Profile("dmaap") -@Configuration -public class DmaapConfig { - - @Autowired - private ApplicationContext ctx; - - @Autowired - @Qualifier("dmaapRestTemplate") - private RestTemplate dmaapRestTemplate; - - @Autowired - @Qualifier("dmaapHeaders") - private HttpHeaders dmaapHeaders; - - @Value("${jms.bind.address}") - private String bindAddress; - - @PostConstruct - public void init() { - System.setProperty("activemq.tcp.url", bindAddress); - } - - @Bean(destroyMethod = "stop") - public BrokerService brokerService() throws Exception { - - BrokerService broker = new BrokerService(); - broker.addConnector(bindAddress); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.setSchedulerSupport(false); - broker.start(); - - return broker; - } - - @Bean(name = "connectionFactory") - public ActiveMQConnectionFactory activeMQConnectionFactory() { - return new ActiveMQConnectionFactory(bindAddress); - } - - @Bean - public CachingConnectionFactory cachingConnectionFactory() { - return new CachingConnectionFactory(activeMQConnectionFactory()); - } - - @Bean(name = "destinationQueue") - public ActiveMQQueue activeMQQueue() { - return new ActiveMQQueue("IN_QUEUE"); - } - - @Bean - public JmsTemplate jmsTemplate() { - JmsTemplate jmsTemplate = new JmsTemplate(); - - jmsTemplate.setConnectionFactory(activeMQConnectionFactory()); - jmsTemplate.setDefaultDestination(activeMQQueue()); - - return jmsTemplate; - } - - @Bean - public AAIDmaapEventJMSProducer jmsProducer() { - return new AAIDmaapEventJMSProducer(); - } - - @Bean(name = "jmsConsumer") - public AAIDmaapEventJMSConsumer jmsConsumer() throws Exception { - return new AAIDmaapEventJMSConsumer(ctx.getEnvironment(), dmaapRestTemplate, dmaapHeaders); - } - - @Bean - public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception { - - DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); - - messageListenerContainer.setConnectionFactory(cachingConnectionFactory()); - messageListenerContainer.setDestinationName("IN_QUEUE"); - messageListenerContainer.setMessageListener(jmsConsumer()); - - return messageListenerContainer; - } -} diff --git a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java b/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java deleted file mode 100644 index cbd9b105..00000000 --- a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.web; - -import java.io.UnsupportedEncodingException; -import java.util.Base64; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.web.client.RestTemplate; - -@Configuration -public class EventClientPublisher { - - private static final Logger LOGGER = LoggerFactory.getLogger(EventClientPublisher.class); - - @Value("${dmaap.ribbon.listOfServers:}") - private String hosts; - - @Value("${dmaap.ribbon.username:}") - private String username; - - @Value("${dmaap.ribbon.password:}") - private String password; - - @Value("${dmaap.ribbon.topic:AAI-EVENT}") - private String topic; - - @Value("${dmaap.ribbon.batchSize:100}") - private int maxBatchSize; - - @Value("${dmaap.ribbon.maxAgeMs:250}") - private int maxAgeMs; - - @Value("${dmaap.ribbon.delayBetweenBatches:100}") - private int delayBetweenBatches; - - @Value("${dmaap.ribbon.protocol:http}") - private String protocol; - - @Value("${dmaap.ribbon.transportType:HTTPNOAUTH}") - private String tranportType; - - @Value("${dmaap.ribbon.contentType:application/json}") - private String contentType; - - @Bean(name = "dmaapRestTemplate") - public RestTemplate dmaapRestTemplate() { - return new RestTemplate(); - } - - @Bean(name = "dmaapHeaders") - public HttpHeaders dmaapHeaders() throws UnsupportedEncodingException { - - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.setContentType(MediaType.APPLICATION_JSON); - - if (username != null && password != null) { - - if (!StringUtils.EMPTY.equals(username) && !StringUtils.EMPTY.equals(password)) { - - byte[] userPass = (username + ":" + password).getBytes("UTF-8"); - - httpHeaders.set("Authorization", "Basic " + Base64.getEncoder().encodeToString(userPass)); - } - } - - return httpHeaders; - } - -} diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java new file mode 100644 index 00000000..71ae5b6b --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java @@ -0,0 +1,175 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + package org.onap.aai.web; + + import java.util.HashMap; + import java.util.Map; + + import javax.annotation.PostConstruct; + + import org.apache.activemq.ActiveMQConnectionFactory; + import org.apache.activemq.broker.BrokerService; + import org.apache.activemq.command.ActiveMQQueue; + import org.apache.kafka.clients.producer.ProducerConfig; +import org.onap.aai.kafka.AAIKafkaEventJMSConsumer; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.springframework.beans.factory.annotation.Autowired; + import org.springframework.beans.factory.annotation.Value; + import org.springframework.context.ApplicationContext; + import org.springframework.context.annotation.Bean; + import org.springframework.context.annotation.Configuration; + import org.springframework.context.annotation.Profile; + import org.springframework.jms.connection.CachingConnectionFactory; + import org.springframework.jms.core.JmsTemplate; + import org.springframework.jms.listener.DefaultMessageListenerContainer; + import org.springframework.kafka.core.DefaultKafkaProducerFactory; + import org.springframework.kafka.core.KafkaTemplate; + import org.springframework.kafka.core.ProducerFactory; + + @Profile("kafka") + @Configuration + public class KafkaConfig { + + @Autowired + private ApplicationContext ctx; + + + @Value("${jms.bind.address}") + private String bindAddress; + + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.producer.properties.security.protocol}") + private String securityProtocol; + + @Value("${spring.kafka.producer.properties.sasl.mechanism}") + private String saslMechanism; + + @Value("${spring.kafka.producer.properties.sasl.jaas.config}") + private String saslJaasConfig; + + @Value("${spring.kafka.producer.retries}") + private Integer retries; + + private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + + @PostConstruct + public void init() { + System.setProperty("activemq.tcp.url", bindAddress); + } + + @Bean(destroyMethod = "stop") + public BrokerService brokerService() throws Exception { + + BrokerService broker = new BrokerService(); + broker.addConnector(bindAddress); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setSchedulerSupport(false); + broker.start(); + + return broker; + } + + @Bean(name = "connectionFactory") + public ActiveMQConnectionFactory activeMQConnectionFactory() { + return new ActiveMQConnectionFactory(bindAddress); + } + + @Bean + public CachingConnectionFactory cachingConnectionFactory() { + return new CachingConnectionFactory(activeMQConnectionFactory()); + } + + @Bean(name = "destinationQueue") + public ActiveMQQueue activeMQQueue() { + return new ActiveMQQueue("IN_QUEUE"); + } + + @Bean + public JmsTemplate jmsTemplate() { + JmsTemplate jmsTemplate = new JmsTemplate(); + + jmsTemplate.setConnectionFactory(activeMQConnectionFactory()); + jmsTemplate.setDefaultDestination(activeMQQueue()); + + return jmsTemplate; + } + + @Bean + public AAIKafkaEventJMSProducer jmsProducer() { + return new AAIKafkaEventJMSProducer(); + } + + @Bean(name = "jmsConsumer") + public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception { + return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate()); + } + + @Bean + public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception { + + DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); + + messageListenerContainer.setConnectionFactory(cachingConnectionFactory()); + messageListenerContainer.setDestinationName("IN_QUEUE"); + messageListenerContainer.setMessageListener(jmsConsumer()); + + return messageListenerContainer; + } + + @Bean + public ProducerFactory producerFactory() throws Exception { + Map props = new HashMap<>(); + if(bootstrapServers == null){ + logger.error("Environment Variable " + bootstrapServers + " is missing"); + throw new Exception("Environment Variable " + bootstrapServers + " is missing"); + } + else{ + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + if(saslJaasConfig == null){ + logger.info("Not using any authentication for kafka interaction"); + } + else{ + logger.info("Using authentication provided by kafka interaction"); + // Strimzi Kafka security properties + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("security.protocol", securityProtocol); + props.put("sasl.mechanism", saslMechanism); + props.put("sasl.jaas.config", saslJaasConfig); + props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries)); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5"); + } + + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate kafkaTemplate() throws Exception { + return new KafkaTemplate<>(producerFactory()); + } + } + \ No newline at end of file diff --git a/aai-core/src/main/resources/logback.xml b/aai-core/src/main/resources/logback.xml index f2b1399f..fc66b32e 100644 --- a/aai-core/src/main/resources/logback.xml +++ b/aai-core/src/main/resources/logback.xml @@ -173,14 +173,14 @@ - WARN - ${logDirectory}/dmaapAAIEventConsumer/error.log + ${logDirectory}/kafkaAAIEventConsumer/error.log - ${logDirectory}/dmaapAAIEventConsumer/error.log.%d{yyyy-MM-dd} + ${logDirectory}/kafkaAAIEventConsumer/error.log.%d{yyyy-MM-dd} @@ -188,32 +188,32 @@ - DEBUG ACCEPT DENY - ${logDirectory}/dmaapAAIEventConsumer/debug.log + ${logDirectory}/kafkaAAIEventConsumer/debug.log - ${logDirectory}/dmaapAAIEventConsumer/debug.log.%d{yyyy-MM-dd} + ${logDirectory}/kafkaAAIEventConsumer/debug.log.%d{yyyy-MM-dd} ${eelfLogPattern} - INFO ACCEPT DENY - ${logDirectory}/dmaapAAIEventConsumer/metrics.log + ${logDirectory}/kafkaAAIEventConsumer/metrics.log - ${logDirectory}/dmaapAAIEventConsumer/metrics.log.%d{yyyy-MM-dd} + ${logDirectory}/kafkaAAIEventConsumer/metrics.log.%d{yyyy-MM-dd} @@ -365,10 +365,10 @@ - - - - + + + + diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java new file mode 100644 index 00000000..c72499c4 --- /dev/null +++ b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java @@ -0,0 +1,89 @@ +package org.onap.aai.kafka; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import javax.jms.TextMessage; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.onap.aai.PayloadUtil; +import org.springframework.core.env.Environment; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.util.ReflectionTestUtils; + +@RunWith(MockitoJUnitRunner.class) +@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) +public class AAIKafkaEventJMSConsumerTest { + + @Mock + private Environment environment; + + @Mock + private KafkaTemplate kafkaTemplate; + + private AAIKafkaEventJMSConsumer aaiKafkaEventJMSConsumer; + + @Before + public void setUp(){ + aaiKafkaEventJMSConsumer = new AAIKafkaEventJMSConsumer(environment,kafkaTemplate); + } + + @Test + public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived() + throws Exception + { + TextMessage mockTextMessage = mock(TextMessage.class); + String payload = PayloadUtil.getResourcePayload("aai-event.json"); + + when(mockTextMessage.getText()).thenReturn(payload); + aaiKafkaEventJMSConsumer.onMessage(mockTextMessage); + verify(kafkaTemplate, times(1)).send(eq("AAI-EVENT"), anyString()); + } + + @Test + public void onMessage_shouldNotSendMessageToKafkaTopic_whenInvalidEventReceived() throws Exception{ + TextMessage mockTextMessage = mock(TextMessage.class); + String payload = PayloadUtil.getResourcePayload("aai-invalid-event.json"); + when(mockTextMessage.getText()).thenReturn(payload); + aaiKafkaEventJMSConsumer.onMessage(mockTextMessage); + } + + + @Test + public void onMessage_shouldHandleJSONException() throws Exception { + // Arrange + AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate); + TextMessage mockTextMessage = mock(TextMessage.class); + ReflectionTestUtils.setField(consumer, "kafkaTemplate", null); // Simulate null kafkaTemplate + + // Act + consumer.onMessage(mockTextMessage); + + // Assert + // Verify that exception is logged + } + + @Test + public void onMessage_shouldHandleGenericException() throws Exception { + // Arrange + AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate); + TextMessage mockTextMessage = mock(TextMessage.class); + when(mockTextMessage.getText()).thenReturn("{\"event-topic\":\"AAI-EVENT\",\"aaiEventPayload\":{}}"); // Valid JSON but missing required fields + + // Act + consumer.onMessage(mockTextMessage); + + // Assert + // Verify that exception is logged + } + +} diff --git a/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java b/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java index b4b8810e..a0c3f639 100644 --- a/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java +++ b/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java @@ -39,27 +39,27 @@ import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.onap.aai.AAISetup; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; import org.onap.aai.domain.notificationEvent.NotificationEvent.EventHeader; import org.onap.aai.exceptions.AAIException; import org.onap.aai.introspection.Introspector; import org.onap.aai.introspection.Loader; import org.onap.aai.introspection.ModelType; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; public class StoreNotificationEventTest extends AAISetup { - private static AAIDmaapEventJMSProducer producer; + private static AAIKafkaEventJMSProducer producer; private static StoreNotificationEvent sne; @BeforeClass public static void setUp() { - producer = Mockito.mock(AAIDmaapEventJMSProducer.class); + producer = Mockito.mock(AAIKafkaEventJMSProducer.class); // sne = new StoreNotificationEvent(producer, "transiationId", "sourceOfTruth"); } @Before public void setUpBefore() { - producer = Mockito.mock(AAIDmaapEventJMSProducer.class); + producer = Mockito.mock(AAIKafkaEventJMSProducer.class); sne = new StoreNotificationEvent(producer, "transiationId", "sourceOfTruth"); } diff --git a/aai-core/src/test/resources/logback.xml b/aai-core/src/test/resources/logback.xml index 5d4f7bf3..4c82c0bf 100644 --- a/aai-core/src/test/resources/logback.xml +++ b/aai-core/src/test/resources/logback.xml @@ -170,14 +170,14 @@ - WARN - ${logDirectory}/dmaapAAIEventConsumer/error.log + ${logDirectory}/kafkaAAIEventConsumer/error.log - ${logDirectory}/dmaapAAIEventConsumer/error.log.%d{yyyy-MM-dd} + ${logDirectory}/kafkaAAIEventConsumer/error.log.%d{yyyy-MM-dd} @@ -185,32 +185,32 @@ - DEBUG ACCEPT DENY - ${logDirectory}/dmaapAAIEventConsumer/debug.log + ${logDirectory}/kafkaAAIEventConsumer/debug.log - ${logDirectory}/dmaapAAIEventConsumer/debug.log.%d{yyyy-MM-dd} + ${logDirectory}/kafkaAAIEventConsumer/debug.log.%d{yyyy-MM-dd} ${eelfLogPattern} - INFO ACCEPT DENY - ${logDirectory}/dmaapAAIEventConsumer/metrics.log + ${logDirectory}/kafkaAAIEventConsumer/metrics.log - ${logDirectory}/dmaapAAIEventConsumer/metrics.log.%d{yyyy-MM-dd} + ${logDirectory}/kafkaAAIEventConsumer/metrics.log.%d{yyyy-MM-dd} @@ -362,10 +362,10 @@ - - - - + + + + diff --git a/aai-core/src/test/resources/payloads/resource/aai-event.json b/aai-core/src/test/resources/payloads/resource/aai-event.json new file mode 100644 index 00000000..0fab96da --- /dev/null +++ b/aai-core/src/test/resources/payloads/resource/aai-event.json @@ -0,0 +1,64 @@ +{ + "event-topic": "AAI-EVENT", + "aaiEventPayload": { + "cambria.partition": "AAI", + "event-header": { + "severity": "NORMAL", + "entity-type": "object-group", + "top-entity-type": "object-group", + "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster", + "event-type": "AAI-EVENT", + "domain": "dev", + "action": "UPDATE", + "sequence-number": "0", + "id": "23f12123-c326-48a7-b57e-e48746c295ea", + "source-name": "postman-api", + "version": "v28", + "timestamp": "20231207-12:14:44:757" + }, + "entity": { + "relationship-list": { + "relationship": [ + { + "related-to": "cell", + "relationship-data": [ + { + "relationship-value": "445611193273958916", + "relationship-key": "cell.cell-id" + } + ], + "related-link": "/aai/v28/network/cells/cell/445611193273958916", + "relationship-label": "org.onap.relationships.inventory.MemberOf", + "related-to-property": [ + { + "property-key": "cell.cell-name", + "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913" + } + ] + }, + { + "related-to": "cell", + "relationship-data": [ + { + "relationship-value": "445611193272330241", + "relationship-key": "cell.cell-id" + } + ], + "related-link": "/aai/v28/network/cells/cell/445611193272330241", + "relationship-label": "org.onap.relationships.inventory.MemberOf", + "related-to-property": [ + { + "property-key": "cell.cell-name", + "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803" + } + ] + } + ] + }, + "group-name": "Urban", + "resource-version": "1701951284582", + "group-type": "cell", + "object-group-id": "ric_cluster" + } + } +} \ No newline at end of file diff --git a/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json b/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json new file mode 100644 index 00000000..77b2fc1f --- /dev/null +++ b/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json @@ -0,0 +1,64 @@ +{ + "event-topic": "AAI-INVALID-EVENT", + "aaiEventPayload": { + "cambria.partition": "AAI", + "event-header": { + "severity": "NORMAL", + "entity-type": "object-group", + "top-entity-type": "object-group", + "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster", + "event-type": "AAI-EVENT", + "domain": "dev", + "action": "UPDATE", + "sequence-number": "0", + "id": "23f12123-c326-48a7-b57e-e48746c295ea", + "source-name": "postman-api", + "version": "v28", + "timestamp": "20231207-12:14:44:757" + }, + "entity": { + "relationship-list": { + "relationship": [ + { + "related-to": "cell", + "relationship-data": [ + { + "relationship-value": "445611193273958916", + "relationship-key": "cell.cell-id" + } + ], + "related-link": "/aai/v28/network/cells/cell/445611193273958916", + "relationship-label": "org.onap.relationships.inventory.MemberOf", + "related-to-property": [ + { + "property-key": "cell.cell-name", + "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913" + } + ] + }, + { + "related-to": "cell", + "relationship-data": [ + { + "relationship-value": "445611193272330241", + "relationship-key": "cell.cell-id" + } + ], + "related-link": "/aai/v28/network/cells/cell/445611193272330241", + "relationship-label": "org.onap.relationships.inventory.MemberOf", + "related-to-property": [ + { + "property-key": "cell.cell-name", + "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803" + } + ] + } + ] + }, + "group-name": "Urban", + "resource-version": "1701951284582", + "group-type": "cell", + "object-group-id": "ric_cluster" + } + } +} \ No newline at end of file diff --git a/aai-els-onap-logging/src/main/java/org/onap/aai/aailog/logs/AaiDmaapMetricLog.java b/aai-els-onap-logging/src/main/java/org/onap/aai/aailog/logs/AaiDmaapMetricLog.java index 0d3a573d..8190dd10 100644 --- a/aai-els-onap-logging/src/main/java/org/onap/aai/aailog/logs/AaiDmaapMetricLog.java +++ b/aai-els-onap-logging/src/main/java/org/onap/aai/aailog/logs/AaiDmaapMetricLog.java @@ -33,7 +33,7 @@ public class AaiDmaapMetricLog extends MDCSetup { protected static final Logger logger = LoggerFactory.getLogger(AaiDmaapMetricLog.class); private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE-RETURN"); - private static final String TARGET_ENTITY = "DMaaP"; + private static final String TARGET_ENTITY = "KAFKA"; public AaiDmaapMetricLog() { if (MDC.get(ONAPLogConstants.MDCs.SERVER_FQDN) == null) { diff --git a/aai-els-onap-logging/src/main/java/org/onap/logging/filter/base/ONAPComponents.java b/aai-els-onap-logging/src/main/java/org/onap/logging/filter/base/ONAPComponents.java index 5465b7ce..b9cdd0e9 100644 --- a/aai-els-onap-logging/src/main/java/org/onap/logging/filter/base/ONAPComponents.java +++ b/aai-els-onap-logging/src/main/java/org/onap/logging/filter/base/ONAPComponents.java @@ -24,15 +24,12 @@ import java.util.EnumSet; import java.util.Set; public enum ONAPComponents implements ONAPComponentsList { - OPENSTACK_ADAPTER, BPMN, GRM, AAI, DMAAP, POLICY, CATALOG_DB, REQUEST_DB, SNIRO, SDC, EXTERNAL, VNF_ADAPTER, SDNC_ADAPTER, MULTICLOUD, CLAMP, PORTAL, VID, APPC, DCAE, HOLMES, SDNC, SO, VFC, ESR, DBC, DR, MR, OPTF; + OPENSTACK_ADAPTER, BPMN, GRM, AAI, POLICY, CATALOG_DB, REQUEST_DB, SNIRO, SDC, EXTERNAL, VNF_ADAPTER, SDNC_ADAPTER, MULTICLOUD, CLAMP, PORTAL, VID, APPC, DCAE, HOLMES, SDNC, SO, VFC, ESR, DBC, DR, MR, OPTF; public static Set getSOInternalComponents() { return EnumSet.of(OPENSTACK_ADAPTER, BPMN, CATALOG_DB, REQUEST_DB, VNF_ADAPTER, SDNC_ADAPTER); } - public static Set getDMAAPInternalComponents() { - return EnumSet.of(DBC, DR, MR); - } public static Set getAAIInternalComponents() { return EnumSet.of(ESR); @@ -42,8 +39,6 @@ public enum ONAPComponents implements ONAPComponentsList { public String toString() { if (getSOInternalComponents().contains(this)) return SO + "." + this.name(); - else if (getDMAAPInternalComponents().contains(this)) - return DMAAP + "." + this.name(); else if (getAAIInternalComponents().contains(this)) return AAI + "." + this.name(); else diff --git a/aai-els-onap-logging/src/test/resources/error.properties b/aai-els-onap-logging/src/test/resources/error.properties index c1470dae..1f68df3f 100644 --- a/aai-els-onap-logging/src/test/resources/error.properties +++ b/aai-els-onap-logging/src/test/resources/error.properties @@ -142,8 +142,8 @@ AAI_7117=5:4:ERROR:7117:500:3002:Error in get http client object:500 AAI_7118=5:4:ERROR:7118:500:3002:Script Error:900 AAI_7119=5:4:ERROR:7119:500:3002:Unknown host:900 -#--- DMaaP related errors -AAI_7304=4:5:ERROR:7304:500:3002:Error reaching DMaaP to send event:200 +#--- Kafka related errors +AAI_7304=4:5:ERROR:7304:500:3002:Error reaching Kafka to send event:200 AAI_7350=5:4:ERROR:7305:500:3002:Notification event creation failed:500 #--- aairestctlr: 7401-7499 diff --git a/aai-parent/pom.xml b/aai-parent/pom.xml index 2b5c191a..01d58f32 100644 --- a/aai-parent/pom.xml +++ b/aai-parent/pom.xml @@ -57,7 +57,6 @@ limitations under the License. 3.8.0 1.10.0 0.40.2 - 1.1.12 5.0.0 2.7.11 2.0.0-oss @@ -627,12 +626,6 @@ limitations under the License. ${io.swagger.version} - - org.onap.dmaap.messagerouter.dmaapclient - dmaapClient - ${dmaap.client.version} - - io.netty netty-handler @@ -855,6 +848,9 @@ limitations under the License. org.apache.maven.plugins maven-checkstyle-plugin 3.0.0 + + True + diff --git a/docs/AAI REST API Documentation/AAIRESTAPI.rst b/docs/AAI REST API Documentation/AAIRESTAPI.rst index c4972e43..28d86273 100644 --- a/docs/AAI REST API Documentation/AAIRESTAPI.rst +++ b/docs/AAI REST API Documentation/AAIRESTAPI.rst @@ -108,7 +108,7 @@ Beijing (v13) API changes ~~~~~~~~~~~ -- DELETE request will generate a DMAAP event for each node deleted +- DELETE request will generate a KAFKA event for each node deleted (not just the for which the DELETE request was made) - Relationship list @@ -188,7 +188,7 @@ Event Specific: interest. - In v11, clients that require lineage, children, or relationship - information need to subscribe to a different DMaaP topic than the + information need to subscribe to a different Kafka topic than the current one. Relationship List -- cgit 1.2.3-korg