summaryrefslogtreecommitdiffstats
path: root/aai-core/src/main/java
diff options
context:
space:
mode:
authorFiete Ostkamp <Fiete.Ostkamp@telekom.de>2024-07-22 14:21:26 +0200
committerFiete Ostkamp <Fiete.Ostkamp@telekom.de>2024-07-23 09:44:23 +0200
commit2513e6ea205ca1a6d1c7ba13b8b448ab649966c2 (patch)
treef0a8507c0d2b7f570952ec1c7a22db04ca9e9858 /aai-core/src/main/java
parent405369a8be85f53208cc97a44d8fb3942313e2e7 (diff)
Make JMS-based messaging compatible with tracing
- use dependency injection instead of new Foo() for jms related classes - inject interfaces and not their implementations - add integration test that asserts message sending via JMS to Kafka [1] [1] this also prepares removal of ActiveMQ as a middleman Issue-ID: AAI-3932 Change-Id: Icbdd264f5b52adc72aa05046ed66d9bd5108c372 Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de>
Diffstat (limited to 'aai-core/src/main/java')
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java224
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java40
-rw-r--r--aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java10
-rw-r--r--aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java40
-rw-r--r--aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java313
5 files changed, 309 insertions, 318 deletions
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
index 731f3dfc..67f6842e 100644
--- a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
+++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
@@ -20,116 +20,114 @@
* ============LICENSE_END=========================================================
*/
- package org.onap.aai.kafka;
-
- import java.util.Map;
- import java.util.Objects;
-
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
-
- import org.json.JSONException;
- import org.json.JSONObject;
- import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
- import org.onap.aai.exceptions.AAIException;
- import org.onap.aai.logging.AaiElsErrorCode;
- import org.onap.aai.logging.ErrorLogHelper;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.slf4j.MDC;
- import org.springframework.core.env.Environment;
- import org.springframework.kafka.core.KafkaTemplate;
-
- public class AAIKafkaEventJMSConsumer implements MessageListener {
-
- private static final String EVENT_TOPIC = "event-topic";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class);
-
- private Environment environment;
- private Map<String, String> mdcCopy;
- private KafkaTemplate<String,String> kafkaTemplate;
-
- public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate<String,String> kafkaTemplate) {
- super();
- mdcCopy = MDC.getCopyOfContextMap();
- Objects.nonNull(environment);
- this.environment = environment;
- this.kafkaTemplate=kafkaTemplate;
- }
-
- @Override
- public void onMessage(Message message) {
-
- if (kafkaTemplate == null) {
- return;
- }
-
- String jsmMessageTxt = "";
- String aaiEvent = "";
- JSONObject aaiEventHeader;
- JSONObject joPayload;
- String transactionId = "";
- String serviceName = "";
- String eventName = "";
- String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
- String errorDescription = "";
-
- if (mdcCopy != null) {
- MDC.setContextMap(mdcCopy);
- }
-
- if (message instanceof TextMessage) {
- AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
- try {
- jsmMessageTxt = ((TextMessage) message).getText();
- JSONObject jo = new JSONObject(jsmMessageTxt);
- if (jo.has("aaiEventPayload")) {
- joPayload = jo.getJSONObject("aaiEventPayload");
- aaiEvent = joPayload.toString();
- } else {
- return;
- }
- if (jo.getString(EVENT_TOPIC) != null) {
- eventName = jo.getString(EVENT_TOPIC);
- }
- if (joPayload.has("event-header")) {
- try {
- aaiEventHeader = joPayload.getJSONObject("event-header");
- if (aaiEventHeader.has("id")) {
- transactionId = aaiEventHeader.get("id").toString();
- }
- if (aaiEventHeader.has("entity-link")) {
- serviceName = aaiEventHeader.get("entity-link").toString();
- }
- } catch (JSONException jexc) {
- // ignore, this is just used for logging
- }
- }
- metricLog.pre(eventName, aaiEvent, transactionId, serviceName);
-
-
- if ("AAI-EVENT".equals(eventName)) {
- // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class);
- kafkaTemplate.send(eventName,aaiEvent);
-
- } else {
- LOGGER.error(String.format("%s|Event Topic invalid.", eventName));
- }
- } catch (JMSException | JSONException e) {
- aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
- errorDescription = e.getMessage();
- ErrorLogHelper.logException(new AAIException("AAI_7350"));
- } catch (Exception e) {
- aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
- errorDescription = e.getMessage();
- ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt));
- } finally {
- metricLog.post(aaiElsErrorCode, errorDescription);
- }
- }
- }
- }
- \ No newline at end of file
+package org.onap.aai.kafka;
+
+import java.util.Map;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
+import org.onap.aai.exceptions.AAIException;
+import org.onap.aai.logging.AaiElsErrorCode;
+import org.onap.aai.logging.ErrorLogHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.kafka.core.KafkaTemplate;
+
+
+public class AAIKafkaEventJMSConsumer implements MessageListener {
+
+ private static final String EVENT_TOPIC = "event-topic";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class);
+
+ private Map<String, String> mdcCopy;
+ private final KafkaTemplate<String, String> kafkaTemplate;
+
+ public AAIKafkaEventJMSConsumer(KafkaTemplate<String, String> kafkaTemplate) {
+ super();
+ mdcCopy = MDC.getCopyOfContextMap();
+ this.kafkaTemplate = kafkaTemplate;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+
+ if (kafkaTemplate == null) {
+ return;
+ }
+
+ String jmsMessageText = "";
+ String aaiEvent = "";
+ JSONObject aaiEventHeader;
+ JSONObject aaiEventPayload;
+ String transactionId = "";
+ String serviceName = "";
+ String topicName = "";
+ String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
+ String errorDescription = "";
+
+ if (mdcCopy != null) {
+ MDC.setContextMap(mdcCopy);
+ }
+
+ if (message instanceof TextMessage) {
+ AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
+ try {
+ jmsMessageText = ((TextMessage) message).getText();
+ JSONObject jsonObject = new JSONObject(jmsMessageText);
+ if (jsonObject.has("aaiEventPayload")) {
+ aaiEventPayload = jsonObject.getJSONObject("aaiEventPayload");
+ aaiEvent = aaiEventPayload.toString();
+ } else {
+ return;
+ }
+ if (jsonObject.getString(EVENT_TOPIC) != null) {
+ topicName = jsonObject.getString(EVENT_TOPIC);
+ }
+ if (aaiEventPayload.has("event-header")) {
+ try {
+ aaiEventHeader = aaiEventPayload.getJSONObject("event-header");
+ if (aaiEventHeader.has("id")) {
+ transactionId = aaiEventHeader.get("id").toString();
+ }
+ if (aaiEventHeader.has("entity-link")) {
+ serviceName = aaiEventHeader.get("entity-link").toString();
+ }
+ } catch (JSONException jexc) {
+ // ignore, this is just used for logging
+ }
+ }
+ metricLog.pre(topicName, aaiEvent, transactionId, serviceName);
+
+ if ("AAI-EVENT".equals(topicName)) {
+
+ kafkaTemplate.send(topicName, aaiEvent);
+
+ } else {
+ LOGGER.error(String.format("%s|Event Topic invalid.", topicName));
+ }
+ } catch (JMSException | JSONException e) {
+ aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
+ errorDescription = e.getMessage();
+ ErrorLogHelper.logException(new AAIException("AAI_7350"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ // LOGGER.error();
+ LOGGER.error(e.getMessage());
+ aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
+ errorDescription = e.getMessage();
+ String errorMessage = String.format("Error processing message: %s, message payload: %s", e.getMessage(), jmsMessageText);
+ ErrorLogHelper.logException(new AAIException("AAI_7304", errorMessage));
+ } finally {
+ metricLog.post(aaiElsErrorCode, errorDescription);
+ }
+ }
+ }
+}
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
index 00cf677f..a46f2e99 100644
--- a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
+++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
@@ -22,44 +22,28 @@
package org.onap.aai.kafka;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.command.ActiveMQQueue;
import org.json.JSONObject;
import org.onap.aai.util.AAIConfig;
-import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
+import org.springframework.stereotype.Service;
+import lombok.RequiredArgsConstructor;
+
+@Service
+@RequiredArgsConstructor
public class AAIKafkaEventJMSProducer implements MessageProducer {
- private JmsTemplate jmsTemplate;
+ @Value("${aai.events.enabled:true}") private boolean eventsEnabled;
+ private final JmsTemplate jmsTemplate;
- public AAIKafkaEventJMSProducer() {
- if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) {
- this.jmsTemplate = new JmsTemplate();
- String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547");
- this.jmsTemplate
- .setConnectionFactory(new CachingConnectionFactory(new ActiveMQConnectionFactory(activeMqTcpUrl)));
- this.jmsTemplate.setDefaultDestination(new ActiveMQQueue("IN_QUEUE"));
+ public void sendMessageToDefaultDestination(String msg) {
+ if (eventsEnabled) {
+ jmsTemplate.convertAndSend(msg);
}
}
public void sendMessageToDefaultDestination(JSONObject finalJson) {
- if (jmsTemplate != null) {
- jmsTemplate.convertAndSend(finalJson.toString());
- CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory();
- if (ccf != null) {
- ccf.destroy();
- }
- }
- }
-
- public void sendMessageToDefaultDestination(String msg) {
- if (jmsTemplate != null) {
- jmsTemplate.convertAndSend(msg);
- CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory();
- if (ccf != null) {
- ccf.destroy();
- }
- }
+ sendMessageToDefaultDestination(finalJson.toString());
}
}
diff --git a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
index 6f3e8883..127cf538 100644
--- a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
+++ b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
@@ -39,14 +39,18 @@ import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
import org.onap.aai.kafka.MessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.core.env.Environment;
+import org.springframework.jms.core.JmsTemplate;
public class StoreNotificationEvent {
private static final Logger LOGGER = LoggerFactory.getLogger(StoreNotificationEvent.class);
- private MessageProducer messageProducer;
+ @Autowired JmsTemplate jmsTemplate;
+
+ private final MessageProducer messageProducer;
private String fromAppId = "";
private String transId = "";
private final String transactionId;
@@ -59,12 +63,12 @@ public class StoreNotificationEvent {
* Instantiates a new store notification event.
*/
public StoreNotificationEvent(String transactionId, String sourceOfTruth) {
- this.messageProducer = new AAIKafkaEventJMSProducer();
+ this.messageProducer = new AAIKafkaEventJMSProducer(jmsTemplate);
this.transactionId = transactionId;
this.sourceOfTruth = sourceOfTruth;
}
- public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) {
+ public StoreNotificationEvent(MessageProducer producer, String transactionId, String sourceOfTruth) {
this.messageProducer = producer;
this.transactionId = transactionId;
this.sourceOfTruth = sourceOfTruth;
diff --git a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
index 6fbc297b..b2821b04 100644
--- a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
+++ b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
@@ -20,7 +20,6 @@
package org.onap.aai.util.delta;
-import com.google.gson.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -28,38 +27,35 @@ import java.util.Date;
import java.util.Map;
import org.onap.aai.db.props.AAIProperties;
-import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
import org.onap.aai.kafka.MessageProducer;
import org.onap.aai.util.AAIConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
-public class DeltaEvents {
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
- private static final Logger LOGGER = LoggerFactory.getLogger(DeltaEvents.class);
+public class DeltaEvents {
private static final Gson gson =
new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES).create();
+ private static final String eventVersion = "v1";
- private String transId;
- private String sourceName;
- private String eventVersion = "v1";
- private String schemaVersion;
- private Map<String, ObjectDelta> objectDeltas;
+ private final String transId;
+ private final String sourceName;
+ private final String schemaVersion;
+ private final Map<String, ObjectDelta> objectDeltas;
- private MessageProducer messageProducer;
+ @Autowired private MessageProducer messageProducer;
public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas) {
- this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer());
- }
-
- public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas,
- MessageProducer messageProducer) {
- this.transId = transId;
- this.sourceName = sourceName;
- this.schemaVersion = schemaVersion;
- this.objectDeltas = objectDeltas;
- this.messageProducer = messageProducer;
+ this.transId = transId;
+ this.sourceName = sourceName;
+ this.schemaVersion = schemaVersion;
+ this.objectDeltas = objectDeltas;
}
public boolean triggerEvents() {
@@ -98,7 +94,7 @@ public class DeltaEvents {
header.addProperty("source-name", this.sourceName);
header.addProperty("domain", this.getDomain());
header.addProperty("event-type", this.getEventType());
- header.addProperty("event-version", this.eventVersion);
+ header.addProperty("event-version", eventVersion);
header.addProperty("schema-version", this.schemaVersion);
header.addProperty("action", first.getAction().toString());
header.addProperty("entity-type", this.getEntityType(first));
@@ -126,7 +122,7 @@ public class DeltaEvents {
/**
* Given Long timestamp convert to format YYYYMMdd-HH:mm:ss:SSS
- *
+ *
* @param timestamp milliseconds since epoc
* @return long timestamp in format YYYYMMdd-HH:mm:ss:SSS
*/
diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
index 71ae5b6b..05e4768d 100644
--- a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
+++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
@@ -18,158 +18,167 @@
* ============LICENSE_END=========================================================
*/
- package org.onap.aai.web;
-
- import java.util.HashMap;
- import java.util.Map;
-
- import javax.annotation.PostConstruct;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.broker.BrokerService;
- import org.apache.activemq.command.ActiveMQQueue;
- import org.apache.kafka.clients.producer.ProducerConfig;
+package org.onap.aai.web;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.onap.aai.introspection.LoaderFactory;
import org.onap.aai.kafka.AAIKafkaEventJMSConsumer;
import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.onap.aai.kafka.MessageProducer;
+import org.onap.aai.rest.notification.NotificationService;
import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.Profile;
- import org.springframework.jms.connection.CachingConnectionFactory;
- import org.springframework.jms.core.JmsTemplate;
- import org.springframework.jms.listener.DefaultMessageListenerContainer;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.core.ProducerFactory;
-
- @Profile("kafka")
- @Configuration
- public class KafkaConfig {
-
- @Autowired
- private ApplicationContext ctx;
-
-
- @Value("${jms.bind.address}")
- private String bindAddress;
-
- @Value("${spring.kafka.producer.bootstrap-servers}")
- private String bootstrapServers;
-
- @Value("${spring.kafka.producer.properties.security.protocol}")
- private String securityProtocol;
-
- @Value("${spring.kafka.producer.properties.sasl.mechanism}")
- private String saslMechanism;
-
- @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
- private String saslJaasConfig;
-
- @Value("${spring.kafka.producer.retries}")
- private Integer retries;
-
- private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
-
- @PostConstruct
- public void init() {
- System.setProperty("activemq.tcp.url", bindAddress);
- }
-
- @Bean(destroyMethod = "stop")
- public BrokerService brokerService() throws Exception {
-
- BrokerService broker = new BrokerService();
- broker.addConnector(bindAddress);
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.setSchedulerSupport(false);
- broker.start();
-
- return broker;
- }
-
- @Bean(name = "connectionFactory")
- public ActiveMQConnectionFactory activeMQConnectionFactory() {
- return new ActiveMQConnectionFactory(bindAddress);
- }
-
- @Bean
- public CachingConnectionFactory cachingConnectionFactory() {
- return new CachingConnectionFactory(activeMQConnectionFactory());
- }
-
- @Bean(name = "destinationQueue")
- public ActiveMQQueue activeMQQueue() {
- return new ActiveMQQueue("IN_QUEUE");
- }
-
- @Bean
- public JmsTemplate jmsTemplate() {
- JmsTemplate jmsTemplate = new JmsTemplate();
-
- jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
- jmsTemplate.setDefaultDestination(activeMQQueue());
-
- return jmsTemplate;
- }
-
- @Bean
- public AAIKafkaEventJMSProducer jmsProducer() {
- return new AAIKafkaEventJMSProducer();
- }
-
- @Bean(name = "jmsConsumer")
- public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception {
- return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate());
- }
-
- @Bean
- public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
-
- DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
-
- messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
- messageListenerContainer.setDestinationName("IN_QUEUE");
- messageListenerContainer.setMessageListener(jmsConsumer());
-
- return messageListenerContainer;
- }
-
- @Bean
- public ProducerFactory<String, String> producerFactory() throws Exception {
- Map<String, Object> props = new HashMap<>();
- if(bootstrapServers == null){
- logger.error("Environment Variable " + bootstrapServers + " is missing");
- throw new Exception("Environment Variable " + bootstrapServers + " is missing");
- }
- else{
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- }
- if(saslJaasConfig == null){
- logger.info("Not using any authentication for kafka interaction");
- }
- else{
- logger.info("Using authentication provided by kafka interaction");
- // Strimzi Kafka security properties
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("security.protocol", securityProtocol);
- props.put("sasl.mechanism", saslMechanism);
- props.put("sasl.jaas.config", saslJaasConfig);
- props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries));
- props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
- }
-
- return new DefaultKafkaProducerFactory<>(props);
- }
-
- @Bean
- public KafkaTemplate<String, String> kafkaTemplate() throws Exception {
- return new KafkaTemplate<>(producerFactory());
- }
- }
- \ No newline at end of file
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+@Profile("kafka")
+@Configuration
+public class KafkaConfig {
+
+ @Value("${jms.bind.address}")
+ private String bindAddress;
+
+ @Value("${spring.kafka.producer.bootstrap-servers}")
+ private String bootstrapServers;
+
+ @Value("${spring.kafka.producer.properties.security.protocol}")
+ private String securityProtocol;
+
+ @Value("${spring.kafka.producer.properties.sasl.mechanism}")
+ private String saslMechanism;
+
+ @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
+ private String saslJaasConfig;
+
+ @Value("${spring.kafka.producer.retries}")
+ private String retries;
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+
+ @PostConstruct
+ public void init() {
+ System.setProperty("activemq.tcp.url", bindAddress);
+ }
+
+ @Bean(destroyMethod = "stop")
+ public BrokerService brokerService() throws Exception {
+
+ BrokerService broker = new BrokerService();
+ broker.addConnector(bindAddress);
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.setSchedulerSupport(false);
+ broker.start();
+
+ return broker;
+ }
+
+ @ConditionalOnMissingBean
+ @Bean(name = "connectionFactory")
+ public ConnectionFactory activeMQConnectionFactory() {
+ return new ActiveMQConnectionFactory(bindAddress);
+ }
+
+ @Bean
+ @ConditionalOnMissingBean
+ public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory targetConnectionFactory) {
+ return new CachingConnectionFactory(targetConnectionFactory);
+ }
+
+ @Bean(name = "destinationQueue")
+ public Queue activeMQQueue() {
+ return new ActiveMQQueue("IN_QUEUE");
+ }
+
+ @Bean
+ public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, Queue queue) {
+ JmsTemplate jmsTemplate = new JmsTemplate();
+
+ jmsTemplate.setConnectionFactory(connectionFactory);
+ jmsTemplate.setDefaultDestination(queue);
+
+ return jmsTemplate;
+ }
+
+ @Bean(name = "jmsConsumer")
+ public MessageListener jmsConsumer(KafkaTemplate<String, String> kafkaTemplate) throws Exception {
+ return new AAIKafkaEventJMSConsumer(kafkaTemplate);
+ }
+
+ @Bean
+ public DefaultMessageListenerContainer defaultMessageListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener)
+ throws Exception {
+
+ DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
+
+ messageListenerContainer.setConnectionFactory(connectionFactory);
+ messageListenerContainer.setDestinationName("IN_QUEUE");
+ messageListenerContainer.setMessageListener(messageListener);
+
+ return messageListenerContainer;
+ }
+
+ @Bean
+ public ProducerFactory<String, String> producerFactory() throws Exception {
+ Map<String, Object> props = new HashMap<>();
+ if (bootstrapServers == null) {
+ logger.error("Environment Variable " + bootstrapServers + " is missing");
+ throw new Exception("Environment Variable " + bootstrapServers + " is missing");
+ } else {
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ }
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put(ProducerConfig.RETRIES_CONFIG, retries);
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+
+ if (saslJaasConfig == null) {
+ logger.info("Not using any authentication for kafka interaction");
+ } else {
+ logger.info("Using authentication provided by kafka interaction");
+ // Strimzi Kafka security properties
+ props.put("security.protocol", securityProtocol);
+ props.put("sasl.mechanism", saslMechanism);
+ props.put("sasl.jaas.config", saslJaasConfig);
+ }
+
+ return new DefaultKafkaProducerFactory<>(props);
+ }
+
+ @Bean
+ public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) throws Exception {
+ return new KafkaTemplate<>(producerFactory);
+ }
+
+ @Bean
+ public MessageProducer messageProducer(JmsTemplate jmsTemplate) {
+ return new AAIKafkaEventJMSProducer(jmsTemplate);
+ }
+
+ @Bean
+ public NotificationService notificationService(LoaderFactory loaderFactory,
+ @Value("${schema.uri.base.path}") String basePath,
+ @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled) {
+ return new NotificationService(loaderFactory, basePath, isDeltaEventsEnabled);
+ }
+}