aboutsummaryrefslogtreecommitdiffstats
path: root/aai-core
diff options
context:
space:
mode:
authorFiete Ostkamp <Fiete.Ostkamp@telekom.de>2024-07-22 14:21:26 +0200
committerFiete Ostkamp <Fiete.Ostkamp@telekom.de>2024-07-23 09:44:23 +0200
commit2513e6ea205ca1a6d1c7ba13b8b448ab649966c2 (patch)
treef0a8507c0d2b7f570952ec1c7a22db04ca9e9858 /aai-core
parent405369a8be85f53208cc97a44d8fb3942313e2e7 (diff)
Make JMS-based messaging compatible with tracing
- use dependency injection instead of new Foo() for jms related classes - inject interfaces and not their implementations - add integration test that asserts message sending via JMS to Kafka [1] [1] this also prepares removal of ActiveMQ as a middleman Issue-ID: AAI-3932 Change-Id: Icbdd264f5b52adc72aa05046ed66d9bd5108c372 Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de>
Diffstat (limited to 'aai-core')
-rw-r--r--aai-core/pom.xml5
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java224
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java40
-rw-r--r--aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java10
-rw-r--r--aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java40
-rw-r--r--aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java313
-rw-r--r--aai-core/src/main/resources/logback.xml10
-rw-r--r--aai-core/src/test/java/org/onap/aai/AAISetup.java4
-rw-r--r--aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java97
-rw-r--r--aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java89
-rw-r--r--aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java77
-rw-r--r--aai-core/src/test/resources/logback.xml10
-rw-r--r--aai-core/src/test/resources/payloads/expected/aai-event.json61
13 files changed, 560 insertions, 420 deletions
diff --git a/aai-core/pom.xml b/aai-core/pom.xml
index bd759f1f..c091546b 100644
--- a/aai-core/pom.xml
+++ b/aai-core/pom.xml
@@ -388,6 +388,11 @@ limitations under the License.
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
+ <groupId>javax.jms</groupId>
+ <artifactId>javax.jms-api</artifactId>
+ <version>2.0.1</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
index 731f3dfc..67f6842e 100644
--- a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
+++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
@@ -20,116 +20,114 @@
* ============LICENSE_END=========================================================
*/
- package org.onap.aai.kafka;
-
- import java.util.Map;
- import java.util.Objects;
-
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
-
- import org.json.JSONException;
- import org.json.JSONObject;
- import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
- import org.onap.aai.exceptions.AAIException;
- import org.onap.aai.logging.AaiElsErrorCode;
- import org.onap.aai.logging.ErrorLogHelper;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.slf4j.MDC;
- import org.springframework.core.env.Environment;
- import org.springframework.kafka.core.KafkaTemplate;
-
- public class AAIKafkaEventJMSConsumer implements MessageListener {
-
- private static final String EVENT_TOPIC = "event-topic";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class);
-
- private Environment environment;
- private Map<String, String> mdcCopy;
- private KafkaTemplate<String,String> kafkaTemplate;
-
- public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate<String,String> kafkaTemplate) {
- super();
- mdcCopy = MDC.getCopyOfContextMap();
- Objects.nonNull(environment);
- this.environment = environment;
- this.kafkaTemplate=kafkaTemplate;
- }
-
- @Override
- public void onMessage(Message message) {
-
- if (kafkaTemplate == null) {
- return;
- }
-
- String jsmMessageTxt = "";
- String aaiEvent = "";
- JSONObject aaiEventHeader;
- JSONObject joPayload;
- String transactionId = "";
- String serviceName = "";
- String eventName = "";
- String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
- String errorDescription = "";
-
- if (mdcCopy != null) {
- MDC.setContextMap(mdcCopy);
- }
-
- if (message instanceof TextMessage) {
- AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
- try {
- jsmMessageTxt = ((TextMessage) message).getText();
- JSONObject jo = new JSONObject(jsmMessageTxt);
- if (jo.has("aaiEventPayload")) {
- joPayload = jo.getJSONObject("aaiEventPayload");
- aaiEvent = joPayload.toString();
- } else {
- return;
- }
- if (jo.getString(EVENT_TOPIC) != null) {
- eventName = jo.getString(EVENT_TOPIC);
- }
- if (joPayload.has("event-header")) {
- try {
- aaiEventHeader = joPayload.getJSONObject("event-header");
- if (aaiEventHeader.has("id")) {
- transactionId = aaiEventHeader.get("id").toString();
- }
- if (aaiEventHeader.has("entity-link")) {
- serviceName = aaiEventHeader.get("entity-link").toString();
- }
- } catch (JSONException jexc) {
- // ignore, this is just used for logging
- }
- }
- metricLog.pre(eventName, aaiEvent, transactionId, serviceName);
-
-
- if ("AAI-EVENT".equals(eventName)) {
- // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class);
- kafkaTemplate.send(eventName,aaiEvent);
-
- } else {
- LOGGER.error(String.format("%s|Event Topic invalid.", eventName));
- }
- } catch (JMSException | JSONException e) {
- aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
- errorDescription = e.getMessage();
- ErrorLogHelper.logException(new AAIException("AAI_7350"));
- } catch (Exception e) {
- aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
- errorDescription = e.getMessage();
- ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt));
- } finally {
- metricLog.post(aaiElsErrorCode, errorDescription);
- }
- }
- }
- }
- \ No newline at end of file
+package org.onap.aai.kafka;
+
+import java.util.Map;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
+import org.onap.aai.exceptions.AAIException;
+import org.onap.aai.logging.AaiElsErrorCode;
+import org.onap.aai.logging.ErrorLogHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.kafka.core.KafkaTemplate;
+
+
+public class AAIKafkaEventJMSConsumer implements MessageListener {
+
+ private static final String EVENT_TOPIC = "event-topic";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class);
+
+ private Map<String, String> mdcCopy;
+ private final KafkaTemplate<String, String> kafkaTemplate;
+
+ public AAIKafkaEventJMSConsumer(KafkaTemplate<String, String> kafkaTemplate) {
+ super();
+ mdcCopy = MDC.getCopyOfContextMap();
+ this.kafkaTemplate = kafkaTemplate;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+
+ if (kafkaTemplate == null) {
+ return;
+ }
+
+ String jmsMessageText = "";
+ String aaiEvent = "";
+ JSONObject aaiEventHeader;
+ JSONObject aaiEventPayload;
+ String transactionId = "";
+ String serviceName = "";
+ String topicName = "";
+ String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
+ String errorDescription = "";
+
+ if (mdcCopy != null) {
+ MDC.setContextMap(mdcCopy);
+ }
+
+ if (message instanceof TextMessage) {
+ AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
+ try {
+ jmsMessageText = ((TextMessage) message).getText();
+ JSONObject jsonObject = new JSONObject(jmsMessageText);
+ if (jsonObject.has("aaiEventPayload")) {
+ aaiEventPayload = jsonObject.getJSONObject("aaiEventPayload");
+ aaiEvent = aaiEventPayload.toString();
+ } else {
+ return;
+ }
+ if (jsonObject.getString(EVENT_TOPIC) != null) {
+ topicName = jsonObject.getString(EVENT_TOPIC);
+ }
+ if (aaiEventPayload.has("event-header")) {
+ try {
+ aaiEventHeader = aaiEventPayload.getJSONObject("event-header");
+ if (aaiEventHeader.has("id")) {
+ transactionId = aaiEventHeader.get("id").toString();
+ }
+ if (aaiEventHeader.has("entity-link")) {
+ serviceName = aaiEventHeader.get("entity-link").toString();
+ }
+ } catch (JSONException jexc) {
+ // ignore, this is just used for logging
+ }
+ }
+ metricLog.pre(topicName, aaiEvent, transactionId, serviceName);
+
+ if ("AAI-EVENT".equals(topicName)) {
+
+ kafkaTemplate.send(topicName, aaiEvent);
+
+ } else {
+ LOGGER.error(String.format("%s|Event Topic invalid.", topicName));
+ }
+ } catch (JMSException | JSONException e) {
+ aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
+ errorDescription = e.getMessage();
+ ErrorLogHelper.logException(new AAIException("AAI_7350"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ // LOGGER.error();
+ LOGGER.error(e.getMessage());
+ aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
+ errorDescription = e.getMessage();
+ String errorMessage = String.format("Error processing message: %s, message payload: %s", e.getMessage(), jmsMessageText);
+ ErrorLogHelper.logException(new AAIException("AAI_7304", errorMessage));
+ } finally {
+ metricLog.post(aaiElsErrorCode, errorDescription);
+ }
+ }
+ }
+}
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
index 00cf677f..a46f2e99 100644
--- a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
+++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
@@ -22,44 +22,28 @@
package org.onap.aai.kafka;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
import org.json.JSONObject;
import org.onap.aai.util.AAIConfig;
-import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
+import org.springframework.stereotype.Service;
+import lombok.RequiredArgsConstructor;
+
+@Service
+@RequiredArgsConstructor
public class AAIKafkaEventJMSProducer implements MessageProducer {
- private JmsTemplate jmsTemplate;
+ @Value("${aai.events.enabled:true}") private boolean eventsEnabled;
+ private final JmsTemplate jmsTemplate;
- public AAIKafkaEventJMSProducer() {
- if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) {
- this.jmsTemplate = new JmsTemplate();
- String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547");
- this.jmsTemplate
- .setConnectionFactory(new CachingConnectionFactory(new ActiveMQConnectionFactory(activeMqTcpUrl)));
- this.jmsTemplate.setDefaultDestination(new ActiveMQQueue("IN_QUEUE"));
+ public void sendMessageToDefaultDestination(String msg) {
+ if (eventsEnabled) {
+ jmsTemplate.convertAndSend(msg);
}
}
public void sendMessageToDefaultDestination(JSONObject finalJson) {
- if (jmsTemplate != null) {
- jmsTemplate.convertAndSend(finalJson.toString());
- CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory();
- if (ccf != null) {
- ccf.destroy();
- }
- }
- }
-
- public void sendMessageToDefaultDestination(String msg) {
- if (jmsTemplate != null) {
- jmsTemplate.convertAndSend(msg);
- CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory();
- if (ccf != null) {
- ccf.destroy();
- }
- }
+ sendMessageToDefaultDestination(finalJson.toString());
}
}
diff --git a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
index 6f3e8883..127cf538 100644
--- a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
+++ b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
@@ -39,14 +39,18 @@ import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
import org.onap.aai.kafka.MessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.core.env.Environment;
+import org.springframework.jms.core.JmsTemplate;
public class StoreNotificationEvent {
private static final Logger LOGGER = LoggerFactory.getLogger(StoreNotificationEvent.class);
- private MessageProducer messageProducer;
+ @Autowired JmsTemplate jmsTemplate;
+
+ private final MessageProducer messageProducer;
private String fromAppId = "";
private String transId = "";
private final String transactionId;
@@ -59,12 +63,12 @@ public class StoreNotificationEvent {
* Instantiates a new store notification event.
*/
public StoreNotificationEvent(String transactionId, String sourceOfTruth) {
- this.messageProducer = new AAIKafkaEventJMSProducer();
+ this.messageProducer = new AAIKafkaEventJMSProducer(jmsTemplate);
this.transactionId = transactionId;
this.sourceOfTruth = sourceOfTruth;
}
- public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) {
+ public StoreNotificationEvent(MessageProducer producer, String transactionId, String sourceOfTruth) {
this.messageProducer = producer;
this.transactionId = transactionId;
this.sourceOfTruth = sourceOfTruth;
diff --git a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
index 6fbc297b..b2821b04 100644
--- a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
+++ b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
@@ -20,7 +20,6 @@
package org.onap.aai.util.delta;
-import com.google.gson.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -28,38 +27,35 @@ import java.util.Date;
import java.util.Map;
import org.onap.aai.db.props.AAIProperties;
-import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
import org.onap.aai.kafka.MessageProducer;
import org.onap.aai.util.AAIConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
-public class DeltaEvents {
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
- private static final Logger LOGGER = LoggerFactory.getLogger(DeltaEvents.class);
+public class DeltaEvents {
private static final Gson gson =
new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES).create();
+ private static final String eventVersion = "v1";
- private String transId;
- private String sourceName;
- private String eventVersion = "v1";
- private String schemaVersion;
- private Map<String, ObjectDelta> objectDeltas;
+ private final String transId;
+ private final String sourceName;
+ private final String schemaVersion;
+ private final Map<String, ObjectDelta> objectDeltas;
- private MessageProducer messageProducer;
+ @Autowired private MessageProducer messageProducer;
public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas) {
- this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer());
- }
-
- public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas,
- MessageProducer messageProducer) {
- this.transId = transId;
- this.sourceName = sourceName;
- this.schemaVersion = schemaVersion;
- this.objectDeltas = objectDeltas;
- this.messageProducer = messageProducer;
+ this.transId = transId;
+ this.sourceName = sourceName;
+ this.schemaVersion = schemaVersion;
+ this.objectDeltas = objectDeltas;
}
public boolean triggerEvents() {
@@ -98,7 +94,7 @@ public class DeltaEvents {
header.addProperty("source-name", this.sourceName);
header.addProperty("domain", this.getDomain());
header.addProperty("event-type", this.getEventType());
- header.addProperty("event-version", this.eventVersion);
+ header.addProperty("event-version", eventVersion);
header.addProperty("schema-version", this.schemaVersion);
header.addProperty("action", first.getAction().toString());
header.addProperty("entity-type", this.getEntityType(first));
@@ -126,7 +122,7 @@ public class DeltaEvents {
/**
* Given Long timestamp convert to format YYYYMMdd-HH:mm:ss:SSS
- *
+ *
* @param timestamp milliseconds since epoc
* @return long timestamp in format YYYYMMdd-HH:mm:ss:SSS
*/
diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
index 71ae5b6b..05e4768d 100644
--- a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
+++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
@@ -18,158 +18,167 @@
* ============LICENSE_END=========================================================
*/
- package org.onap.aai.web;
-
- import java.util.HashMap;
- import java.util.Map;
-
- import javax.annotation.PostConstruct;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.broker.BrokerService;
- import org.apache.activemq.command.ActiveMQQueue;
- import org.apache.kafka.clients.producer.ProducerConfig;
+package org.onap.aai.web;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.onap.aai.introspection.LoaderFactory;
import org.onap.aai.kafka.AAIKafkaEventJMSConsumer;
import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.onap.aai.kafka.MessageProducer;
+import org.onap.aai.rest.notification.NotificationService;
import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Profile;
- import org.springframework.jms.connection.CachingConnectionFactory;
- import org.springframework.jms.core.JmsTemplate;
- import org.springframework.jms.listener.DefaultMessageListenerContainer;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.core.ProducerFactory;
-
- @Profile("kafka")
- @Configuration
- public class KafkaConfig {
-
- @Autowired
- private ApplicationContext ctx;
-
-
- @Value("${jms.bind.address}")
- private String bindAddress;
-
- @Value("${spring.kafka.producer.bootstrap-servers}")
- private String bootstrapServers;
-
- @Value("${spring.kafka.producer.properties.security.protocol}")
- private String securityProtocol;
-
- @Value("${spring.kafka.producer.properties.sasl.mechanism}")
- private String saslMechanism;
-
- @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
- private String saslJaasConfig;
-
- @Value("${spring.kafka.producer.retries}")
- private Integer retries;
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
-
- @PostConstruct
- public void init() {
- System.setProperty("activemq.tcp.url", bindAddress);
- }
-
- @Bean(destroyMethod = "stop")
- public BrokerService brokerService() throws Exception {
-
- BrokerService broker = new BrokerService();
- broker.addConnector(bindAddress);
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.setSchedulerSupport(false);
- broker.start();
-
- return broker;
- }
-
- @Bean(name = "connectionFactory")
- public ActiveMQConnectionFactory activeMQConnectionFactory() {
- return new ActiveMQConnectionFactory(bindAddress);
- }
-
- @Bean
- public CachingConnectionFactory cachingConnectionFactory() {
- return new CachingConnectionFactory(activeMQConnectionFactory());
- }
-
- @Bean(name = "destinationQueue")
- public ActiveMQQueue activeMQQueue() {
- return new ActiveMQQueue("IN_QUEUE");
- }
-
- @Bean
- public JmsTemplate jmsTemplate() {
- JmsTemplate jmsTemplate = new JmsTemplate();
-
- jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
- jmsTemplate.setDefaultDestination(activeMQQueue());
-
- return jmsTemplate;
- }
-
- @Bean
- public AAIKafkaEventJMSProducer jmsProducer() {
- return new AAIKafkaEventJMSProducer();
- }
-
- @Bean(name = "jmsConsumer")
- public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception {
- return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate());
- }
-
- @Bean
- public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
-
- DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
-
- messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
- messageListenerContainer.setDestinationName("IN_QUEUE");
- messageListenerContainer.setMessageListener(jmsConsumer());
-
- return messageListenerContainer;
- }
-
- @Bean
- public ProducerFactory<String, String> producerFactory() throws Exception {
- Map<String, Object> props = new HashMap<>();
- if(bootstrapServers == null){
- logger.error("Environment Variable " + bootstrapServers + " is missing");
- throw new Exception("Environment Variable " + bootstrapServers + " is missing");
- }
- else{
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- }
- if(saslJaasConfig == null){
- logger.info("Not using any authentication for kafka interaction");
- }
- else{
- logger.info("Using authentication provided by kafka interaction");
- // Strimzi Kafka security properties
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("security.protocol", securityProtocol);
- props.put("sasl.mechanism", saslMechanism);
- props.put("sasl.jaas.config", saslJaasConfig);
- props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries));
- props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
- }
-
- return new DefaultKafkaProducerFactory<>(props);
- }
-
- @Bean
- public KafkaTemplate<String, String> kafkaTemplate() throws Exception {
- return new KafkaTemplate<>(producerFactory());
- }
- }
- \ No newline at end of file
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+@Profile("kafka")
+@Configuration
+public class KafkaConfig {
+
+ @Value("${jms.bind.address}")
+ private String bindAddress;
+
+ @Value("${spring.kafka.producer.bootstrap-servers}")
+ private String bootstrapServers;
+
+ @Value("${spring.kafka.producer.properties.security.protocol}")
+ private String securityProtocol;
+
+ @Value("${spring.kafka.producer.properties.sasl.mechanism}")
+ private String saslMechanism;
+
+ @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
+ private String saslJaasConfig;
+
+ @Value("${spring.kafka.producer.retries}")
+ private String retries;
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+
+ @PostConstruct
+ public void init() {
+ System.setProperty("activemq.tcp.url", bindAddress);
+ }
+
+ @Bean(destroyMethod = "stop")
+ public BrokerService brokerService() throws Exception {
+
+ BrokerService broker = new BrokerService();
+ broker.addConnector(bindAddress);
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.setSchedulerSupport(false);
+ broker.start();
+
+ return broker;
+ }
+
+ @ConditionalOnMissingBean
+ @Bean(name = "connectionFactory")
+ public ConnectionFactory activeMQConnectionFactory() {
+ return new ActiveMQConnectionFactory(bindAddress);
+ }
+
+ @Bean
+ @ConditionalOnMissingBean
+ public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory targetConnectionFactory) {
+ return new CachingConnectionFactory(targetConnectionFactory);
+ }
+
+ @Bean(name = "destinationQueue")
+ public Queue activeMQQueue() {
+ return new ActiveMQQueue("IN_QUEUE");
+ }
+
+ @Bean
+ public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, Queue queue) {
+ JmsTemplate jmsTemplate = new JmsTemplate();
+
+ jmsTemplate.setConnectionFactory(connectionFactory);
+ jmsTemplate.setDefaultDestination(queue);
+
+ return jmsTemplate;
+ }
+
+ @Bean(name = "jmsConsumer")
+ public MessageListener jmsConsumer(KafkaTemplate<String, String> kafkaTemplate) throws Exception {
+ return new AAIKafkaEventJMSConsumer(kafkaTemplate);
+ }
+
+ @Bean
+ public DefaultMessageListenerContainer defaultMessageListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener)
+ throws Exception {
+
+ DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
+
+ messageListenerContainer.setConnectionFactory(connectionFactory);
+ messageListenerContainer.setDestinationName("IN_QUEUE");
+ messageListenerContainer.setMessageListener(messageListener);
+
+ return messageListenerContainer;
+ }
+
+ @Bean
+ public ProducerFactory<String, String> producerFactory() throws Exception {
+ Map<String, Object> props = new HashMap<>();
+ if (bootstrapServers == null) {
+ logger.error("Environment Variable " + bootstrapServers + " is missing");
+ throw new Exception("Environment Variable " + bootstrapServers + " is missing");
+ } else {
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ }
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.RETRIES_CONFIG, retries);
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+
+ if (saslJaasConfig == null) {
+ logger.info("Not using any authentication for kafka interaction");
+ } else {
+ logger.info("Using authentication provided by kafka interaction");
+ // Strimzi Kafka security properties
+ props.put("security.protocol", securityProtocol);
+ props.put("sasl.mechanism", saslMechanism);
+ props.put("sasl.jaas.config", saslJaasConfig);
+ }
+
+ return new DefaultKafkaProducerFactory<>(props);
+ }
+
+ @Bean
+ public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) throws Exception {
+ return new KafkaTemplate<>(producerFactory);
+ }
+
+ @Bean
+ public MessageProducer messageProducer(JmsTemplate jmsTemplate) {
+ return new AAIKafkaEventJMSProducer(jmsTemplate);
+ }
+
+ @Bean
+ public NotificationService notificationService(LoaderFactory loaderFactory,
+ @Value("${schema.uri.base.path}") String basePath,
+ @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled) {
+ return new NotificationService(loaderFactory, basePath, isDeltaEventsEnabled);
+ }
+}
diff --git a/aai-core/src/main/resources/logback.xml b/aai-core/src/main/resources/logback.xml
index fc66b32e..ba5b3de8 100644
--- a/aai-core/src/main/resources/logback.xml
+++ b/aai-core/src/main/resources/logback.xml
@@ -166,7 +166,7 @@
<pattern>${eelfTransLogPattern}</pattern>
</encoder>
</appender>
-
+
<appender name="asynctranslog" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1000</queueSize>
<includeCallerData>true</includeCallerData>
@@ -258,7 +258,7 @@
<logger name="ajsc.UserDefinedBeansDefService" level="WARN" />
<logger name="ajsc.LoggingConfigurationService" level="WARN" />
- <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet
+ <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet
logging) -->
<logger name="org.codehaus.groovy" level="WARN" />
<logger name="com.att.scamper" level="WARN" />
@@ -282,7 +282,7 @@
<logger name="org.apache.coyote" level="WARN" />
<logger name="org.apache.jasper" level="WARN" />
- <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging.
+ <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging.
May aid in troubleshooting) -->
<logger name="org.apache.camel" level="WARN" />
<logger name="org.apache.cxf" level="WARN" />
@@ -366,9 +366,7 @@
</logger>
<logger name="org.onap.aai.kafka" level="DEBUG" additivity="false">
- <appender-ref ref="kafkaAAIEventConsumer" />
- <appender-ref ref="kafkaAAIEventConsumerDebug" />
- <appender-ref ref="kafkaAAIEventConsumerMetric" />
+ <appender-ref ref="STDOUT" />
</logger>
<logger name="org.apache" level="OFF" />
diff --git a/aai-core/src/test/java/org/onap/aai/AAISetup.java b/aai-core/src/test/java/org/onap/aai/AAISetup.java
index 16f21ff4..08a0e91b 100644
--- a/aai-core/src/test/java/org/onap/aai/AAISetup.java
+++ b/aai-core/src/test/java/org/onap/aai/AAISetup.java
@@ -40,6 +40,7 @@ import org.onap.aai.setup.AAIConfigTranslator;
import org.onap.aai.setup.SchemaVersion;
import org.onap.aai.setup.SchemaVersions;
import org.onap.aai.util.AAIConstants;
+import org.onap.aai.web.KafkaConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.test.context.ContextConfiguration;
@@ -52,7 +53,8 @@ import org.springframework.test.context.web.WebAppConfiguration;
@ContextConfiguration(
classes = {ConfigConfiguration.class, AAIConfigTranslator.class, EdgeIngestor.class, EdgeSerializer.class,
NodeIngestor.class, SpringContextAware.class, IntrospectionConfig.class, RestBeanConfig.class,
- XmlFormatTransformerConfiguration.class, ValidationService.class, ValidationConfiguration.class, LoaderFactory.class, NotificationService.class})
+ XmlFormatTransformerConfiguration.class, ValidationService.class, ValidationConfiguration.class,
+ KafkaConfig.class, LoaderFactory.class, NotificationService.class})
@TestPropertySource(
properties = {"schema.uri.base.path = /aai", "schema.xsd.maxoccurs = 5000", "schema.translator.list=config",
"schema.nodes.location=src/test/resources/onap/oxm",
diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java
new file mode 100644
index 00000000..c10260da
--- /dev/null
+++ b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java
@@ -0,0 +1,97 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.json.JSONObject;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.onap.aai.AAISetup;
+import org.onap.aai.PayloadUtil;
+import org.onap.aai.restcore.HttpMethod;
+import org.skyscreamer.jsonassert.JSONAssert;
+import org.skyscreamer.jsonassert.JSONCompareMode;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Import;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.kafka.test.utils.KafkaTestUtils;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.TestPropertySource;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@ActiveProfiles("kafka")
+@Import(KafkaTestConfiguration.class)
+@EmbeddedKafka(partitions = 1, topics = { "AAI-EVENT" })
+@TestPropertySource(
+ properties = {
+ "jms.bind.address=tcp://localhost:61647",
+ "aai.events.enabled=true",
+ "spring.kafka.producer.retries=0",
+ "spring.kafka.producer.properties.sasl.jaas.config=#{null}",
+ "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}"
+ })
+public class AAIKafkaEventIntegrationTest extends AAISetup {
+
+ @Mock
+ private KafkaTemplate<String, String> kafkaTemplate;
+
+ @Autowired
+ MessageProducer messageProducer;
+
+ @Autowired
+ private ConsumerFactory<String, String> consumerFactory;
+
+ @Test
+ public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived()
+ throws Exception {
+ Consumer<String, String> consumer = consumerFactory.createConsumer();
+
+ consumer.subscribe(Collections.singletonList("AAI-EVENT"));
+
+ String payload = PayloadUtil.getResourcePayload("aai-event.json");
+ String expectedResponse = PayloadUtil.getExpectedPayload("aai-event.json");
+ messageProducer.sendMessageToDefaultDestination(payload);
+
+ ConsumerRecords<String, String> consumerRecords = KafkaTestUtils.getRecords(consumer, 10000);
+ assertFalse(consumerRecords.isEmpty());
+ consumerRecords.forEach(consumerRecord -> {
+ JSONAssert.assertEquals(expectedResponse, consumerRecord.value(), JSONCompareMode.NON_EXTENSIBLE);
+ });
+ }
+
+}
diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java
deleted file mode 100644
index c72499c4..00000000
--- a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package org.onap.aai.kafka;
-
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import javax.jms.TextMessage;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.onap.aai.PayloadUtil;
-import org.springframework.core.env.Environment;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.util.ReflectionTestUtils;
-
-@RunWith(MockitoJUnitRunner.class)
-@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
-public class AAIKafkaEventJMSConsumerTest {
-
- @Mock
- private Environment environment;
-
- @Mock
- private KafkaTemplate<String,String> kafkaTemplate;
-
- private AAIKafkaEventJMSConsumer aaiKafkaEventJMSConsumer;
-
- @Before
- public void setUp(){
- aaiKafkaEventJMSConsumer = new AAIKafkaEventJMSConsumer(environment,kafkaTemplate);
- }
-
- @Test
- public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived()
- throws Exception
- {
- TextMessage mockTextMessage = mock(TextMessage.class);
- String payload = PayloadUtil.getResourcePayload("aai-event.json");
-
- when(mockTextMessage.getText()).thenReturn(payload);
- aaiKafkaEventJMSConsumer.onMessage(mockTextMessage);
- verify(kafkaTemplate, times(1)).send(eq("AAI-EVENT"), anyString());
- }
-
- @Test
- public void onMessage_shouldNotSendMessageToKafkaTopic_whenInvalidEventReceived() throws Exception{
- TextMessage mockTextMessage = mock(TextMessage.class);
- String payload = PayloadUtil.getResourcePayload("aai-invalid-event.json");
- when(mockTextMessage.getText()).thenReturn(payload);
- aaiKafkaEventJMSConsumer.onMessage(mockTextMessage);
- }
-
-
- @Test
- public void onMessage_shouldHandleJSONException() throws Exception {
- // Arrange
- AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate);
- TextMessage mockTextMessage = mock(TextMessage.class);
- ReflectionTestUtils.setField(consumer, "kafkaTemplate", null); // Simulate null kafkaTemplate
-
- // Act
- consumer.onMessage(mockTextMessage);
-
- // Assert
- // Verify that exception is logged
- }
-
- @Test
- public void onMessage_shouldHandleGenericException() throws Exception {
- // Arrange
- AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate);
- TextMessage mockTextMessage = mock(TextMessage.class);
- when(mockTextMessage.getText()).thenReturn("{\"event-topic\":\"AAI-EVENT\",\"aaiEventPayload\":{}}"); // Valid JSON but missing required fields
-
- // Act
- consumer.onMessage(mockTextMessage);
-
- // Assert
- // Verify that exception is logged
- }
-
-}
diff --git a/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java b/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java
new file mode 100644
index 00000000..730699e6
--- /dev/null
+++ b/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java
@@ -0,0 +1,77 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2024 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.KafkaAdmin;
+import org.springframework.test.context.TestPropertySource;
+
+@TestConfiguration
+public class KafkaTestConfiguration {
+
+ @Value("${spring.embedded.kafka.brokers}") private String bootstrapAddress;
+
+ private String groupId = "test-consumer";
+
+ @Bean
+ public KafkaAdmin kafkaAdmin() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ return new KafkaAdmin(configs);
+ }
+
+ @Bean
+ public ConsumerFactory<String, String> consumerFactory() {
+ Map<String, Object> props = new HashMap<>();
+ props.put(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ bootstrapAddress);
+ props.put(
+ ConsumerConfig.GROUP_ID_CONFIG,
+ groupId);
+ props.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class);
+ props.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return new DefaultKafkaConsumerFactory<>(props);
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
+
+ ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory);
+ return factory;
+ }
+}
diff --git a/aai-core/src/test/resources/logback.xml b/aai-core/src/test/resources/logback.xml
index 4c82c0bf..6acc77f0 100644
--- a/aai-core/src/test/resources/logback.xml
+++ b/aai-core/src/test/resources/logback.xml
@@ -163,7 +163,7 @@
<pattern>${eelfTransLogPattern}</pattern>
</encoder>
</appender>
-
+
<appender name="asynctranslog" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1000</queueSize>
<includeCallerData>true</includeCallerData>
@@ -255,7 +255,7 @@
<logger name="ajsc.UserDefinedBeansDefService" level="WARN" />
<logger name="ajsc.LoggingConfigurationService" level="WARN" />
- <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet
+ <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet
logging) -->
<logger name="org.codehaus.groovy" level="WARN" />
<logger name="com.att.scamper" level="WARN" />
@@ -279,7 +279,7 @@
<logger name="org.apache.coyote" level="WARN" />
<logger name="org.apache.jasper" level="WARN" />
- <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging.
+ <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging.
May aid in troubleshooting) -->
<logger name="org.apache.camel" level="WARN" />
<logger name="org.apache.cxf" level="WARN" />
@@ -363,9 +363,7 @@
</logger>
<logger name="org.onap.aai.kafka" level="DEBUG" additivity="false">
- <appender-ref ref="kafkaAAIEventConsumer" />
- <appender-ref ref="kafkaAAIEventConsumerDebug" />
- <appender-ref ref="kafkaAAIEventConsumerMetric" />
+ <appender-ref ref="STDOUT" />
</logger>
<logger name="org.apache" level="WARN" />
diff --git a/aai-core/src/test/resources/payloads/expected/aai-event.json b/aai-core/src/test/resources/payloads/expected/aai-event.json
new file mode 100644
index 00000000..86c67992
--- /dev/null
+++ b/aai-core/src/test/resources/payloads/expected/aai-event.json
@@ -0,0 +1,61 @@
+{
+ "cambria.partition": "AAI",
+ "event-header": {
+ "severity": "NORMAL",
+ "entity-type": "object-group",
+ "top-entity-type": "object-group",
+ "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster",
+ "event-type": "AAI-EVENT",
+ "domain": "dev",
+ "action": "UPDATE",
+ "sequence-number": "0",
+ "id": "23f12123-c326-48a7-b57e-e48746c295ea",
+ "source-name": "postman-api",
+ "version": "v28",
+ "timestamp": "20231207-12:14:44:757"
+ },
+ "entity": {
+ "relationship-list": {
+ "relationship": [
+ {
+ "related-to": "cell",
+ "relationship-data": [
+ {
+ "relationship-value": "445611193273958916",
+ "relationship-key": "cell.cell-id"
+ }
+ ],
+ "related-link": "/aai/v28/network/cells/cell/445611193273958916",
+ "relationship-label": "org.onap.relationships.inventory.MemberOf",
+ "related-to-property": [
+ {
+ "property-key": "cell.cell-name",
+ "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913"
+ }
+ ]
+ },
+ {
+ "related-to": "cell",
+ "relationship-data": [
+ {
+ "relationship-value": "445611193272330241",
+ "relationship-key": "cell.cell-id"
+ }
+ ],
+ "related-link": "/aai/v28/network/cells/cell/445611193272330241",
+ "relationship-label": "org.onap.relationships.inventory.MemberOf",
+ "related-to-property": [
+ {
+ "property-key": "cell.cell-name",
+ "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803"
+ }
+ ]
+ }
+ ]
+ },
+ "group-name": "Urban",
+ "resource-version": "1701951284582",
+ "group-type": "cell",
+ "object-group-id": "ric_cluster"
+ }
+}