From 2513e6ea205ca1a6d1c7ba13b8b448ab649966c2 Mon Sep 17 00:00:00 2001 From: Fiete Ostkamp Date: Mon, 22 Jul 2024 14:21:26 +0200 Subject: Make JMS-based messaging compatible with tracing - use dependency injection instead of new Foo() for jms related classes - inject interfaces and not their implementations - add integration test that asserts message sending via JMS to Kafka [1] [1] this also prepares removal of ActiveMQ as a middleman Issue-ID: AAI-3932 Change-Id: Icbdd264f5b52adc72aa05046ed66d9bd5108c372 Signed-off-by: Fiete Ostkamp --- aai-core/pom.xml | 5 + .../onap/aai/kafka/AAIKafkaEventJMSConsumer.java | 224 ++++++++------- .../onap/aai/kafka/AAIKafkaEventJMSProducer.java | 40 +-- .../org/onap/aai/util/StoreNotificationEvent.java | 10 +- .../java/org/onap/aai/util/delta/DeltaEvents.java | 40 ++- .../main/java/org/onap/aai/web/KafkaConfig.java | 313 +++++++++++---------- aai-core/src/main/resources/logback.xml | 10 +- aai-core/src/test/java/org/onap/aai/AAISetup.java | 4 +- .../aai/kafka/AAIKafkaEventIntegrationTest.java | 97 +++++++ .../aai/kafka/AAIKafkaEventJMSConsumerTest.java | 89 ------ .../org/onap/aai/kafka/KafkaTestConfiguration.java | 77 +++++ aai-core/src/test/resources/logback.xml | 10 +- .../resources/payloads/expected/aai-event.json | 61 ++++ 13 files changed, 560 insertions(+), 420 deletions(-) create mode 100644 aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java delete mode 100644 aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java create mode 100644 aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java create mode 100644 aai-core/src/test/resources/payloads/expected/aai-event.json (limited to 'aai-core') diff --git a/aai-core/pom.xml b/aai-core/pom.xml index bd759f1f..c091546b 100644 --- a/aai-core/pom.xml +++ b/aai-core/pom.xml @@ -387,6 +387,11 @@ limitations under the License. org.springframework spring-jms + + javax.jms + javax.jms-api + 2.0.1 + com.fasterxml.jackson.core jackson-core diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java index 731f3dfc..67f6842e 100644 --- a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java @@ -20,116 +20,114 @@ * ============LICENSE_END========================================================= */ - package org.onap.aai.kafka; - - import java.util.Map; - import java.util.Objects; - - import javax.jms.JMSException; - import javax.jms.Message; - import javax.jms.MessageListener; - import javax.jms.TextMessage; - - import org.json.JSONException; - import org.json.JSONObject; - import org.onap.aai.aailog.logs.AaiDmaapMetricLog; - import org.onap.aai.exceptions.AAIException; - import org.onap.aai.logging.AaiElsErrorCode; - import org.onap.aai.logging.ErrorLogHelper; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - import org.slf4j.MDC; - import org.springframework.core.env.Environment; - import org.springframework.kafka.core.KafkaTemplate; - - public class AAIKafkaEventJMSConsumer implements MessageListener { - - private static final String EVENT_TOPIC = "event-topic"; - - private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class); - - private Environment environment; - private Map mdcCopy; - private KafkaTemplate kafkaTemplate; - - public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate kafkaTemplate) { - super(); - mdcCopy = MDC.getCopyOfContextMap(); - Objects.nonNull(environment); - this.environment = environment; - this.kafkaTemplate=kafkaTemplate; - } - - @Override - public void onMessage(Message message) { - - if (kafkaTemplate == null) { - return; - } - - String jsmMessageTxt = ""; - String aaiEvent = ""; - JSONObject aaiEventHeader; - JSONObject joPayload; - String transactionId = ""; - String serviceName = ""; - String eventName = ""; - String aaiElsErrorCode = AaiElsErrorCode.SUCCESS; - String errorDescription = ""; - - if (mdcCopy != null) { - MDC.setContextMap(mdcCopy); - } - - if (message instanceof TextMessage) { - AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog(); - try { - jsmMessageTxt = ((TextMessage) message).getText(); - JSONObject jo = new JSONObject(jsmMessageTxt); - if (jo.has("aaiEventPayload")) { - joPayload = jo.getJSONObject("aaiEventPayload"); - aaiEvent = joPayload.toString(); - } else { - return; - } - if (jo.getString(EVENT_TOPIC) != null) { - eventName = jo.getString(EVENT_TOPIC); - } - if (joPayload.has("event-header")) { - try { - aaiEventHeader = joPayload.getJSONObject("event-header"); - if (aaiEventHeader.has("id")) { - transactionId = aaiEventHeader.get("id").toString(); - } - if (aaiEventHeader.has("entity-link")) { - serviceName = aaiEventHeader.get("entity-link").toString(); - } - } catch (JSONException jexc) { - // ignore, this is just used for logging - } - } - metricLog.pre(eventName, aaiEvent, transactionId, serviceName); - - - if ("AAI-EVENT".equals(eventName)) { - // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class); - kafkaTemplate.send(eventName,aaiEvent); - - } else { - LOGGER.error(String.format("%s|Event Topic invalid.", eventName)); - } - } catch (JMSException | JSONException e) { - aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR; - errorDescription = e.getMessage(); - ErrorLogHelper.logException(new AAIException("AAI_7350")); - } catch (Exception e) { - aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR; - errorDescription = e.getMessage(); - ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt)); - } finally { - metricLog.post(aaiElsErrorCode, errorDescription); - } - } - } - } - \ No newline at end of file +package org.onap.aai.kafka; + +import java.util.Map; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +import org.json.JSONException; +import org.json.JSONObject; +import org.onap.aai.aailog.logs.AaiDmaapMetricLog; +import org.onap.aai.exceptions.AAIException; +import org.onap.aai.logging.AaiElsErrorCode; +import org.onap.aai.logging.ErrorLogHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.kafka.core.KafkaTemplate; + + +public class AAIKafkaEventJMSConsumer implements MessageListener { + + private static final String EVENT_TOPIC = "event-topic"; + + private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class); + + private Map mdcCopy; + private final KafkaTemplate kafkaTemplate; + + public AAIKafkaEventJMSConsumer(KafkaTemplate kafkaTemplate) { + super(); + mdcCopy = MDC.getCopyOfContextMap(); + this.kafkaTemplate = kafkaTemplate; + } + + @Override + public void onMessage(Message message) { + + if (kafkaTemplate == null) { + return; + } + + String jmsMessageText = ""; + String aaiEvent = ""; + JSONObject aaiEventHeader; + JSONObject aaiEventPayload; + String transactionId = ""; + String serviceName = ""; + String topicName = ""; + String aaiElsErrorCode = AaiElsErrorCode.SUCCESS; + String errorDescription = ""; + + if (mdcCopy != null) { + MDC.setContextMap(mdcCopy); + } + + if (message instanceof TextMessage) { + AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog(); + try { + jmsMessageText = ((TextMessage) message).getText(); + JSONObject jsonObject = new JSONObject(jmsMessageText); + if (jsonObject.has("aaiEventPayload")) { + aaiEventPayload = jsonObject.getJSONObject("aaiEventPayload"); + aaiEvent = aaiEventPayload.toString(); + } else { + return; + } + if (jsonObject.getString(EVENT_TOPIC) != null) { + topicName = jsonObject.getString(EVENT_TOPIC); + } + if (aaiEventPayload.has("event-header")) { + try { + aaiEventHeader = aaiEventPayload.getJSONObject("event-header"); + if (aaiEventHeader.has("id")) { + transactionId = aaiEventHeader.get("id").toString(); + } + if (aaiEventHeader.has("entity-link")) { + serviceName = aaiEventHeader.get("entity-link").toString(); + } + } catch (JSONException jexc) { + // ignore, this is just used for logging + } + } + metricLog.pre(topicName, aaiEvent, transactionId, serviceName); + + if ("AAI-EVENT".equals(topicName)) { + + kafkaTemplate.send(topicName, aaiEvent); + + } else { + LOGGER.error(String.format("%s|Event Topic invalid.", topicName)); + } + } catch (JMSException | JSONException e) { + aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR; + errorDescription = e.getMessage(); + ErrorLogHelper.logException(new AAIException("AAI_7350")); + } catch (Exception e) { + e.printStackTrace(); + // LOGGER.error(); + LOGGER.error(e.getMessage()); + aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR; + errorDescription = e.getMessage(); + String errorMessage = String.format("Error processing message: %s, message payload: %s", e.getMessage(), jmsMessageText); + ErrorLogHelper.logException(new AAIException("AAI_7304", errorMessage)); + } finally { + metricLog.post(aaiElsErrorCode, errorDescription); + } + } + } +} diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java index 00cf677f..a46f2e99 100644 --- a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java @@ -22,44 +22,28 @@ package org.onap.aai.kafka; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQQueue; import org.json.JSONObject; import org.onap.aai.util.AAIConfig; -import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.core.JmsTemplate; +import org.springframework.stereotype.Service; +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor public class AAIKafkaEventJMSProducer implements MessageProducer { - private JmsTemplate jmsTemplate; + @Value("${aai.events.enabled:true}") private boolean eventsEnabled; + private final JmsTemplate jmsTemplate; - public AAIKafkaEventJMSProducer() { - if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) { - this.jmsTemplate = new JmsTemplate(); - String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547"); - this.jmsTemplate - .setConnectionFactory(new CachingConnectionFactory(new ActiveMQConnectionFactory(activeMqTcpUrl))); - this.jmsTemplate.setDefaultDestination(new ActiveMQQueue("IN_QUEUE")); + public void sendMessageToDefaultDestination(String msg) { + if (eventsEnabled) { + jmsTemplate.convertAndSend(msg); } } public void sendMessageToDefaultDestination(JSONObject finalJson) { - if (jmsTemplate != null) { - jmsTemplate.convertAndSend(finalJson.toString()); - CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory(); - if (ccf != null) { - ccf.destroy(); - } - } - } - - public void sendMessageToDefaultDestination(String msg) { - if (jmsTemplate != null) { - jmsTemplate.convertAndSend(msg); - CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory(); - if (ccf != null) { - ccf.destroy(); - } - } + sendMessageToDefaultDestination(finalJson.toString()); } } diff --git a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java index 6f3e8883..127cf538 100644 --- a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java +++ b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java @@ -39,14 +39,18 @@ import org.onap.aai.kafka.AAIKafkaEventJMSProducer; import org.onap.aai.kafka.MessageProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.core.env.Environment; +import org.springframework.jms.core.JmsTemplate; public class StoreNotificationEvent { private static final Logger LOGGER = LoggerFactory.getLogger(StoreNotificationEvent.class); - private MessageProducer messageProducer; + @Autowired JmsTemplate jmsTemplate; + + private final MessageProducer messageProducer; private String fromAppId = ""; private String transId = ""; private final String transactionId; @@ -59,12 +63,12 @@ public class StoreNotificationEvent { * Instantiates a new store notification event. */ public StoreNotificationEvent(String transactionId, String sourceOfTruth) { - this.messageProducer = new AAIKafkaEventJMSProducer(); + this.messageProducer = new AAIKafkaEventJMSProducer(jmsTemplate); this.transactionId = transactionId; this.sourceOfTruth = sourceOfTruth; } - public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) { + public StoreNotificationEvent(MessageProducer producer, String transactionId, String sourceOfTruth) { this.messageProducer = producer; this.transactionId = transactionId; this.sourceOfTruth = sourceOfTruth; diff --git a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java index 6fbc297b..b2821b04 100644 --- a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java +++ b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java @@ -20,7 +20,6 @@ package org.onap.aai.util.delta; -import com.google.gson.*; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -28,38 +27,35 @@ import java.util.Date; import java.util.Map; import org.onap.aai.db.props.AAIProperties; -import org.onap.aai.kafka.AAIKafkaEventJMSProducer; import org.onap.aai.kafka.MessageProducer; import org.onap.aai.util.AAIConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; -public class DeltaEvents { +import com.google.gson.FieldNamingPolicy; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; - private static final Logger LOGGER = LoggerFactory.getLogger(DeltaEvents.class); +public class DeltaEvents { private static final Gson gson = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES).create(); + private static final String eventVersion = "v1"; - private String transId; - private String sourceName; - private String eventVersion = "v1"; - private String schemaVersion; - private Map objectDeltas; + private final String transId; + private final String sourceName; + private final String schemaVersion; + private final Map objectDeltas; - private MessageProducer messageProducer; + @Autowired private MessageProducer messageProducer; public DeltaEvents(String transId, String sourceName, String schemaVersion, Map objectDeltas) { - this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer()); - } - - public DeltaEvents(String transId, String sourceName, String schemaVersion, Map objectDeltas, - MessageProducer messageProducer) { - this.transId = transId; - this.sourceName = sourceName; - this.schemaVersion = schemaVersion; - this.objectDeltas = objectDeltas; - this.messageProducer = messageProducer; + this.transId = transId; + this.sourceName = sourceName; + this.schemaVersion = schemaVersion; + this.objectDeltas = objectDeltas; } public boolean triggerEvents() { @@ -98,7 +94,7 @@ public class DeltaEvents { header.addProperty("source-name", this.sourceName); header.addProperty("domain", this.getDomain()); header.addProperty("event-type", this.getEventType()); - header.addProperty("event-version", this.eventVersion); + header.addProperty("event-version", eventVersion); header.addProperty("schema-version", this.schemaVersion); header.addProperty("action", first.getAction().toString()); header.addProperty("entity-type", this.getEntityType(first)); @@ -126,7 +122,7 @@ public class DeltaEvents { /** * Given Long timestamp convert to format YYYYMMdd-HH:mm:ss:SSS - * + * * @param timestamp milliseconds since epoc * @return long timestamp in format YYYYMMdd-HH:mm:ss:SSS */ diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java index 71ae5b6b..05e4768d 100644 --- a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java +++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java @@ -18,158 +18,167 @@ * ============LICENSE_END========================================================= */ - package org.onap.aai.web; - - import java.util.HashMap; - import java.util.Map; - - import javax.annotation.PostConstruct; - - import org.apache.activemq.ActiveMQConnectionFactory; - import org.apache.activemq.broker.BrokerService; - import org.apache.activemq.command.ActiveMQQueue; - import org.apache.kafka.clients.producer.ProducerConfig; +package org.onap.aai.web; + +import java.util.HashMap; +import java.util.Map; + +import javax.annotation.PostConstruct; +import javax.jms.ConnectionFactory; +import javax.jms.MessageListener; +import javax.jms.Queue; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.onap.aai.introspection.LoaderFactory; import org.onap.aai.kafka.AAIKafkaEventJMSConsumer; import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.onap.aai.kafka.MessageProducer; +import org.onap.aai.rest.notification.NotificationService; import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - import org.springframework.beans.factory.annotation.Autowired; - import org.springframework.beans.factory.annotation.Value; - import org.springframework.context.ApplicationContext; - import org.springframework.context.annotation.Bean; - import org.springframework.context.annotation.Configuration; - import org.springframework.context.annotation.Profile; - import org.springframework.jms.connection.CachingConnectionFactory; - import org.springframework.jms.core.JmsTemplate; - import org.springframework.jms.listener.DefaultMessageListenerContainer; - import org.springframework.kafka.core.DefaultKafkaProducerFactory; - import org.springframework.kafka.core.KafkaTemplate; - import org.springframework.kafka.core.ProducerFactory; - - @Profile("kafka") - @Configuration - public class KafkaConfig { - - @Autowired - private ApplicationContext ctx; - - - @Value("${jms.bind.address}") - private String bindAddress; - - @Value("${spring.kafka.producer.bootstrap-servers}") - private String bootstrapServers; - - @Value("${spring.kafka.producer.properties.security.protocol}") - private String securityProtocol; - - @Value("${spring.kafka.producer.properties.sasl.mechanism}") - private String saslMechanism; - - @Value("${spring.kafka.producer.properties.sasl.jaas.config}") - private String saslJaasConfig; - - @Value("${spring.kafka.producer.retries}") - private Integer retries; - - private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); - - @PostConstruct - public void init() { - System.setProperty("activemq.tcp.url", bindAddress); - } - - @Bean(destroyMethod = "stop") - public BrokerService brokerService() throws Exception { - - BrokerService broker = new BrokerService(); - broker.addConnector(bindAddress); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.setSchedulerSupport(false); - broker.start(); - - return broker; - } - - @Bean(name = "connectionFactory") - public ActiveMQConnectionFactory activeMQConnectionFactory() { - return new ActiveMQConnectionFactory(bindAddress); - } - - @Bean - public CachingConnectionFactory cachingConnectionFactory() { - return new CachingConnectionFactory(activeMQConnectionFactory()); - } - - @Bean(name = "destinationQueue") - public ActiveMQQueue activeMQQueue() { - return new ActiveMQQueue("IN_QUEUE"); - } - - @Bean - public JmsTemplate jmsTemplate() { - JmsTemplate jmsTemplate = new JmsTemplate(); - - jmsTemplate.setConnectionFactory(activeMQConnectionFactory()); - jmsTemplate.setDefaultDestination(activeMQQueue()); - - return jmsTemplate; - } - - @Bean - public AAIKafkaEventJMSProducer jmsProducer() { - return new AAIKafkaEventJMSProducer(); - } - - @Bean(name = "jmsConsumer") - public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception { - return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate()); - } - - @Bean - public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception { - - DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); - - messageListenerContainer.setConnectionFactory(cachingConnectionFactory()); - messageListenerContainer.setDestinationName("IN_QUEUE"); - messageListenerContainer.setMessageListener(jmsConsumer()); - - return messageListenerContainer; - } - - @Bean - public ProducerFactory producerFactory() throws Exception { - Map props = new HashMap<>(); - if(bootstrapServers == null){ - logger.error("Environment Variable " + bootstrapServers + " is missing"); - throw new Exception("Environment Variable " + bootstrapServers + " is missing"); - } - else{ - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - } - if(saslJaasConfig == null){ - logger.info("Not using any authentication for kafka interaction"); - } - else{ - logger.info("Using authentication provided by kafka interaction"); - // Strimzi Kafka security properties - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("security.protocol", securityProtocol); - props.put("sasl.mechanism", saslMechanism); - props.put("sasl.jaas.config", saslJaasConfig); - props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries)); - props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5"); - } - - return new DefaultKafkaProducerFactory<>(props); - } - - @Bean - public KafkaTemplate kafkaTemplate() throws Exception { - return new KafkaTemplate<>(producerFactory()); - } - } - \ No newline at end of file +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.listener.DefaultMessageListenerContainer; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +@Profile("kafka") +@Configuration +public class KafkaConfig { + + @Value("${jms.bind.address}") + private String bindAddress; + + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.producer.properties.security.protocol}") + private String securityProtocol; + + @Value("${spring.kafka.producer.properties.sasl.mechanism}") + private String saslMechanism; + + @Value("${spring.kafka.producer.properties.sasl.jaas.config}") + private String saslJaasConfig; + + @Value("${spring.kafka.producer.retries}") + private String retries; + + private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + + @PostConstruct + public void init() { + System.setProperty("activemq.tcp.url", bindAddress); + } + + @Bean(destroyMethod = "stop") + public BrokerService brokerService() throws Exception { + + BrokerService broker = new BrokerService(); + broker.addConnector(bindAddress); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setSchedulerSupport(false); + broker.start(); + + return broker; + } + + @ConditionalOnMissingBean + @Bean(name = "connectionFactory") + public ConnectionFactory activeMQConnectionFactory() { + return new ActiveMQConnectionFactory(bindAddress); + } + + @Bean + @ConditionalOnMissingBean + public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory targetConnectionFactory) { + return new CachingConnectionFactory(targetConnectionFactory); + } + + @Bean(name = "destinationQueue") + public Queue activeMQQueue() { + return new ActiveMQQueue("IN_QUEUE"); + } + + @Bean + public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, Queue queue) { + JmsTemplate jmsTemplate = new JmsTemplate(); + + jmsTemplate.setConnectionFactory(connectionFactory); + jmsTemplate.setDefaultDestination(queue); + + return jmsTemplate; + } + + @Bean(name = "jmsConsumer") + public MessageListener jmsConsumer(KafkaTemplate kafkaTemplate) throws Exception { + return new AAIKafkaEventJMSConsumer(kafkaTemplate); + } + + @Bean + public DefaultMessageListenerContainer defaultMessageListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener) + throws Exception { + + DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); + + messageListenerContainer.setConnectionFactory(connectionFactory); + messageListenerContainer.setDestinationName("IN_QUEUE"); + messageListenerContainer.setMessageListener(messageListener); + + return messageListenerContainer; + } + + @Bean + public ProducerFactory producerFactory() throws Exception { + Map props = new HashMap<>(); + if (bootstrapServers == null) { + logger.error("Environment Variable " + bootstrapServers + " is missing"); + throw new Exception("Environment Variable " + bootstrapServers + " is missing"); + } else { + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.RETRIES_CONFIG, retries); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); + + if (saslJaasConfig == null) { + logger.info("Not using any authentication for kafka interaction"); + } else { + logger.info("Using authentication provided by kafka interaction"); + // Strimzi Kafka security properties + props.put("security.protocol", securityProtocol); + props.put("sasl.mechanism", saslMechanism); + props.put("sasl.jaas.config", saslJaasConfig); + } + + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) throws Exception { + return new KafkaTemplate<>(producerFactory); + } + + @Bean + public MessageProducer messageProducer(JmsTemplate jmsTemplate) { + return new AAIKafkaEventJMSProducer(jmsTemplate); + } + + @Bean + public NotificationService notificationService(LoaderFactory loaderFactory, + @Value("${schema.uri.base.path}") String basePath, + @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled) { + return new NotificationService(loaderFactory, basePath, isDeltaEventsEnabled); + } +} diff --git a/aai-core/src/main/resources/logback.xml b/aai-core/src/main/resources/logback.xml index fc66b32e..ba5b3de8 100644 --- a/aai-core/src/main/resources/logback.xml +++ b/aai-core/src/main/resources/logback.xml @@ -166,7 +166,7 @@ ${eelfTransLogPattern} - + 1000 true @@ -258,7 +258,7 @@ - @@ -282,7 +282,7 @@ - @@ -366,9 +366,7 @@ - - - + diff --git a/aai-core/src/test/java/org/onap/aai/AAISetup.java b/aai-core/src/test/java/org/onap/aai/AAISetup.java index 16f21ff4..08a0e91b 100644 --- a/aai-core/src/test/java/org/onap/aai/AAISetup.java +++ b/aai-core/src/test/java/org/onap/aai/AAISetup.java @@ -40,6 +40,7 @@ import org.onap.aai.setup.AAIConfigTranslator; import org.onap.aai.setup.SchemaVersion; import org.onap.aai.setup.SchemaVersions; import org.onap.aai.util.AAIConstants; +import org.onap.aai.web.KafkaConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.test.context.ContextConfiguration; @@ -52,7 +53,8 @@ import org.springframework.test.context.web.WebAppConfiguration; @ContextConfiguration( classes = {ConfigConfiguration.class, AAIConfigTranslator.class, EdgeIngestor.class, EdgeSerializer.class, NodeIngestor.class, SpringContextAware.class, IntrospectionConfig.class, RestBeanConfig.class, - XmlFormatTransformerConfiguration.class, ValidationService.class, ValidationConfiguration.class, LoaderFactory.class, NotificationService.class}) + XmlFormatTransformerConfiguration.class, ValidationService.class, ValidationConfiguration.class, + KafkaConfig.class, LoaderFactory.class, NotificationService.class}) @TestPropertySource( properties = {"schema.uri.base.path = /aai", "schema.xsd.maxoccurs = 5000", "schema.translator.list=config", "schema.nodes.location=src/test/resources/onap/oxm", diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java new file mode 100644 index 00000000..c10260da --- /dev/null +++ b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java @@ -0,0 +1,97 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2024 Deutsche Telekom. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; + +import javax.ws.rs.core.Response; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.json.JSONObject; +import org.junit.Test; +import org.mockito.Mock; +import org.onap.aai.AAISetup; +import org.onap.aai.PayloadUtil; +import org.onap.aai.restcore.HttpMethod; +import org.skyscreamer.jsonassert.JSONAssert; +import org.skyscreamer.jsonassert.JSONCompareMode; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Import; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.TestPropertySource; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@ActiveProfiles("kafka") +@Import(KafkaTestConfiguration.class) +@EmbeddedKafka(partitions = 1, topics = { "AAI-EVENT" }) +@TestPropertySource( + properties = { + "jms.bind.address=tcp://localhost:61647", + "aai.events.enabled=true", + "spring.kafka.producer.retries=0", + "spring.kafka.producer.properties.sasl.jaas.config=#{null}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}" + }) +public class AAIKafkaEventIntegrationTest extends AAISetup { + + @Mock + private KafkaTemplate kafkaTemplate; + + @Autowired + MessageProducer messageProducer; + + @Autowired + private ConsumerFactory consumerFactory; + + @Test + public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived() + throws Exception { + Consumer consumer = consumerFactory.createConsumer(); + + consumer.subscribe(Collections.singletonList("AAI-EVENT")); + + String payload = PayloadUtil.getResourcePayload("aai-event.json"); + String expectedResponse = PayloadUtil.getExpectedPayload("aai-event.json"); + messageProducer.sendMessageToDefaultDestination(payload); + + ConsumerRecords consumerRecords = KafkaTestUtils.getRecords(consumer, 10000); + assertFalse(consumerRecords.isEmpty()); + consumerRecords.forEach(consumerRecord -> { + JSONAssert.assertEquals(expectedResponse, consumerRecord.value(), JSONCompareMode.NON_EXTENSIBLE); + }); + } + +} diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java deleted file mode 100644 index c72499c4..00000000 --- a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java +++ /dev/null @@ -1,89 +0,0 @@ -package org.onap.aai.kafka; - -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import javax.jms.TextMessage; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; -import org.onap.aai.PayloadUtil; -import org.springframework.core.env.Environment; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.util.ReflectionTestUtils; - -@RunWith(MockitoJUnitRunner.class) -@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) -public class AAIKafkaEventJMSConsumerTest { - - @Mock - private Environment environment; - - @Mock - private KafkaTemplate kafkaTemplate; - - private AAIKafkaEventJMSConsumer aaiKafkaEventJMSConsumer; - - @Before - public void setUp(){ - aaiKafkaEventJMSConsumer = new AAIKafkaEventJMSConsumer(environment,kafkaTemplate); - } - - @Test - public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived() - throws Exception - { - TextMessage mockTextMessage = mock(TextMessage.class); - String payload = PayloadUtil.getResourcePayload("aai-event.json"); - - when(mockTextMessage.getText()).thenReturn(payload); - aaiKafkaEventJMSConsumer.onMessage(mockTextMessage); - verify(kafkaTemplate, times(1)).send(eq("AAI-EVENT"), anyString()); - } - - @Test - public void onMessage_shouldNotSendMessageToKafkaTopic_whenInvalidEventReceived() throws Exception{ - TextMessage mockTextMessage = mock(TextMessage.class); - String payload = PayloadUtil.getResourcePayload("aai-invalid-event.json"); - when(mockTextMessage.getText()).thenReturn(payload); - aaiKafkaEventJMSConsumer.onMessage(mockTextMessage); - } - - - @Test - public void onMessage_shouldHandleJSONException() throws Exception { - // Arrange - AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate); - TextMessage mockTextMessage = mock(TextMessage.class); - ReflectionTestUtils.setField(consumer, "kafkaTemplate", null); // Simulate null kafkaTemplate - - // Act - consumer.onMessage(mockTextMessage); - - // Assert - // Verify that exception is logged - } - - @Test - public void onMessage_shouldHandleGenericException() throws Exception { - // Arrange - AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate); - TextMessage mockTextMessage = mock(TextMessage.class); - when(mockTextMessage.getText()).thenReturn("{\"event-topic\":\"AAI-EVENT\",\"aaiEventPayload\":{}}"); // Valid JSON but missing required fields - - // Act - consumer.onMessage(mockTextMessage); - - // Assert - // Verify that exception is logged - } - -} diff --git a/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java b/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java new file mode 100644 index 00000000..730699e6 --- /dev/null +++ b/aai-core/src/test/java/org/onap/aai/kafka/KafkaTestConfiguration.java @@ -0,0 +1,77 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2024 Deutsche Telekom. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.test.context.TestPropertySource; + +@TestConfiguration +public class KafkaTestConfiguration { + + @Value("${spring.embedded.kafka.brokers}") private String bootstrapAddress; + + private String groupId = "test-consumer"; + + @Bean + public KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + return new KafkaAdmin(configs); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + bootstrapAddress); + props.put( + ConsumerConfig.GROUP_ID_CONFIG, + groupId); + props.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + return factory; + } +} diff --git a/aai-core/src/test/resources/logback.xml b/aai-core/src/test/resources/logback.xml index 4c82c0bf..6acc77f0 100644 --- a/aai-core/src/test/resources/logback.xml +++ b/aai-core/src/test/resources/logback.xml @@ -163,7 +163,7 @@ ${eelfTransLogPattern} - + 1000 true @@ -255,7 +255,7 @@ - @@ -279,7 +279,7 @@ - @@ -363,9 +363,7 @@ - - - + diff --git a/aai-core/src/test/resources/payloads/expected/aai-event.json b/aai-core/src/test/resources/payloads/expected/aai-event.json new file mode 100644 index 00000000..86c67992 --- /dev/null +++ b/aai-core/src/test/resources/payloads/expected/aai-event.json @@ -0,0 +1,61 @@ +{ + "cambria.partition": "AAI", + "event-header": { + "severity": "NORMAL", + "entity-type": "object-group", + "top-entity-type": "object-group", + "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster", + "event-type": "AAI-EVENT", + "domain": "dev", + "action": "UPDATE", + "sequence-number": "0", + "id": "23f12123-c326-48a7-b57e-e48746c295ea", + "source-name": "postman-api", + "version": "v28", + "timestamp": "20231207-12:14:44:757" + }, + "entity": { + "relationship-list": { + "relationship": [ + { + "related-to": "cell", + "relationship-data": [ + { + "relationship-value": "445611193273958916", + "relationship-key": "cell.cell-id" + } + ], + "related-link": "/aai/v28/network/cells/cell/445611193273958916", + "relationship-label": "org.onap.relationships.inventory.MemberOf", + "related-to-property": [ + { + "property-key": "cell.cell-name", + "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913" + } + ] + }, + { + "related-to": "cell", + "relationship-data": [ + { + "relationship-value": "445611193272330241", + "relationship-key": "cell.cell-id" + } + ], + "related-link": "/aai/v28/network/cells/cell/445611193272330241", + "relationship-label": "org.onap.relationships.inventory.MemberOf", + "related-to-property": [ + { + "property-key": "cell.cell-name", + "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803" + } + ] + } + ] + }, + "group-name": "Urban", + "resource-version": "1701951284582", + "group-type": "cell", + "object-group-id": "ric_cluster" + } +} -- cgit 1.2.3-korg