diff options
author | Fiete Ostkamp <Fiete.Ostkamp@telekom.de> | 2024-07-22 14:21:26 +0200 |
---|---|---|
committer | Fiete Ostkamp <Fiete.Ostkamp@telekom.de> | 2024-07-23 09:44:23 +0200 |
commit | 2513e6ea205ca1a6d1c7ba13b8b448ab649966c2 (patch) | |
tree | f0a8507c0d2b7f570952ec1c7a22db04ca9e9858 /aai-core/src/main/java | |
parent | 405369a8be85f53208cc97a44d8fb3942313e2e7 (diff) |
Make JMS-based messaging compatible with tracing
- use dependency injection instead of new Foo() for jms related classes
- inject interfaces and not their implementations
- add integration test that asserts message sending via JMS to Kafka [1]
[1] this also prepares removal of ActiveMQ as a middleman
Issue-ID: AAI-3932
Change-Id: Icbdd264f5b52adc72aa05046ed66d9bd5108c372
Signed-off-by: Fiete Ostkamp <Fiete.Ostkamp@telekom.de>
Diffstat (limited to 'aai-core/src/main/java')
5 files changed, 309 insertions, 318 deletions
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java index 731f3dfc..67f6842e 100644 --- a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java @@ -20,116 +20,114 @@ * ============LICENSE_END========================================================= */ - package org.onap.aai.kafka; - - import java.util.Map; - import java.util.Objects; - - import javax.jms.JMSException; - import javax.jms.Message; - import javax.jms.MessageListener; - import javax.jms.TextMessage; - - import org.json.JSONException; - import org.json.JSONObject; - import org.onap.aai.aailog.logs.AaiDmaapMetricLog; - import org.onap.aai.exceptions.AAIException; - import org.onap.aai.logging.AaiElsErrorCode; - import org.onap.aai.logging.ErrorLogHelper; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - import org.slf4j.MDC; - import org.springframework.core.env.Environment; - import org.springframework.kafka.core.KafkaTemplate; - - public class AAIKafkaEventJMSConsumer implements MessageListener { - - private static final String EVENT_TOPIC = "event-topic"; - - private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class); - - private Environment environment; - private Map<String, String> mdcCopy; - private KafkaTemplate<String,String> kafkaTemplate; - - public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate<String,String> kafkaTemplate) { - super(); - mdcCopy = MDC.getCopyOfContextMap(); - Objects.nonNull(environment); - this.environment = environment; - this.kafkaTemplate=kafkaTemplate; - } - - @Override - public void onMessage(Message message) { - - if (kafkaTemplate == null) { - return; - } - - String jsmMessageTxt = ""; - String aaiEvent = ""; - JSONObject aaiEventHeader; - JSONObject joPayload; - String transactionId = ""; - String serviceName = ""; - String eventName = ""; - String aaiElsErrorCode = AaiElsErrorCode.SUCCESS; - String errorDescription = ""; - - if (mdcCopy != null) { - MDC.setContextMap(mdcCopy); - } - - if (message instanceof TextMessage) { - AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog(); - try { - jsmMessageTxt = ((TextMessage) message).getText(); - JSONObject jo = new JSONObject(jsmMessageTxt); - if (jo.has("aaiEventPayload")) { - joPayload = jo.getJSONObject("aaiEventPayload"); - aaiEvent = joPayload.toString(); - } else { - return; - } - if (jo.getString(EVENT_TOPIC) != null) { - eventName = jo.getString(EVENT_TOPIC); - } - if (joPayload.has("event-header")) { - try { - aaiEventHeader = joPayload.getJSONObject("event-header"); - if (aaiEventHeader.has("id")) { - transactionId = aaiEventHeader.get("id").toString(); - } - if (aaiEventHeader.has("entity-link")) { - serviceName = aaiEventHeader.get("entity-link").toString(); - } - } catch (JSONException jexc) { - // ignore, this is just used for logging - } - } - metricLog.pre(eventName, aaiEvent, transactionId, serviceName); - - - if ("AAI-EVENT".equals(eventName)) { - // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class); - kafkaTemplate.send(eventName,aaiEvent); - - } else { - LOGGER.error(String.format("%s|Event Topic invalid.", eventName)); - } - } catch (JMSException | JSONException e) { - aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR; - errorDescription = e.getMessage(); - ErrorLogHelper.logException(new AAIException("AAI_7350")); - } catch (Exception e) { - aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR; - errorDescription = e.getMessage(); - ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt)); - } finally { - metricLog.post(aaiElsErrorCode, errorDescription); - } - } - } - } -
\ No newline at end of file +package org.onap.aai.kafka; + +import java.util.Map; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +import org.json.JSONException; +import org.json.JSONObject; +import org.onap.aai.aailog.logs.AaiDmaapMetricLog; +import org.onap.aai.exceptions.AAIException; +import org.onap.aai.logging.AaiElsErrorCode; +import org.onap.aai.logging.ErrorLogHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.kafka.core.KafkaTemplate; + + +public class AAIKafkaEventJMSConsumer implements MessageListener { + + private static final String EVENT_TOPIC = "event-topic"; + + private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class); + + private Map<String, String> mdcCopy; + private final KafkaTemplate<String, String> kafkaTemplate; + + public AAIKafkaEventJMSConsumer(KafkaTemplate<String, String> kafkaTemplate) { + super(); + mdcCopy = MDC.getCopyOfContextMap(); + this.kafkaTemplate = kafkaTemplate; + } + + @Override + public void onMessage(Message message) { + + if (kafkaTemplate == null) { + return; + } + + String jmsMessageText = ""; + String aaiEvent = ""; + JSONObject aaiEventHeader; + JSONObject aaiEventPayload; + String transactionId = ""; + String serviceName = ""; + String topicName = ""; + String aaiElsErrorCode = AaiElsErrorCode.SUCCESS; + String errorDescription = ""; + + if (mdcCopy != null) { + MDC.setContextMap(mdcCopy); + } + + if (message instanceof TextMessage) { + AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog(); + try { + jmsMessageText = ((TextMessage) message).getText(); + JSONObject jsonObject = new JSONObject(jmsMessageText); + if (jsonObject.has("aaiEventPayload")) { + aaiEventPayload = jsonObject.getJSONObject("aaiEventPayload"); + aaiEvent = aaiEventPayload.toString(); + } else { + return; + } + if (jsonObject.getString(EVENT_TOPIC) != null) { + topicName = jsonObject.getString(EVENT_TOPIC); + } + if (aaiEventPayload.has("event-header")) { + try { + aaiEventHeader = aaiEventPayload.getJSONObject("event-header"); + if (aaiEventHeader.has("id")) { + transactionId = aaiEventHeader.get("id").toString(); + } + if (aaiEventHeader.has("entity-link")) { + serviceName = aaiEventHeader.get("entity-link").toString(); + } + } catch (JSONException jexc) { + // ignore, this is just used for logging + } + } + metricLog.pre(topicName, aaiEvent, transactionId, serviceName); + + if ("AAI-EVENT".equals(topicName)) { + + kafkaTemplate.send(topicName, aaiEvent); + + } else { + LOGGER.error(String.format("%s|Event Topic invalid.", topicName)); + } + } catch (JMSException | JSONException e) { + aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR; + errorDescription = e.getMessage(); + ErrorLogHelper.logException(new AAIException("AAI_7350")); + } catch (Exception e) { + e.printStackTrace(); + // LOGGER.error(); + LOGGER.error(e.getMessage()); + aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR; + errorDescription = e.getMessage(); + String errorMessage = String.format("Error processing message: %s, message payload: %s", e.getMessage(), jmsMessageText); + ErrorLogHelper.logException(new AAIException("AAI_7304", errorMessage)); + } finally { + metricLog.post(aaiElsErrorCode, errorDescription); + } + } + } +} diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java index 00cf677f..a46f2e99 100644 --- a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java @@ -22,44 +22,28 @@ package org.onap.aai.kafka; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.command.ActiveMQQueue; import org.json.JSONObject; import org.onap.aai.util.AAIConfig; -import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.core.JmsTemplate; +import org.springframework.stereotype.Service; +import lombok.RequiredArgsConstructor; + +@Service +@RequiredArgsConstructor public class AAIKafkaEventJMSProducer implements MessageProducer { - private JmsTemplate jmsTemplate; + @Value("${aai.events.enabled:true}") private boolean eventsEnabled; + private final JmsTemplate jmsTemplate; - public AAIKafkaEventJMSProducer() { - if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) { - this.jmsTemplate = new JmsTemplate(); - String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547"); - this.jmsTemplate - .setConnectionFactory(new CachingConnectionFactory(new ActiveMQConnectionFactory(activeMqTcpUrl))); - this.jmsTemplate.setDefaultDestination(new ActiveMQQueue("IN_QUEUE")); + public void sendMessageToDefaultDestination(String msg) { + if (eventsEnabled) { + jmsTemplate.convertAndSend(msg); } } public void sendMessageToDefaultDestination(JSONObject finalJson) { - if (jmsTemplate != null) { - jmsTemplate.convertAndSend(finalJson.toString()); - CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory(); - if (ccf != null) { - ccf.destroy(); - } - } - } - - public void sendMessageToDefaultDestination(String msg) { - if (jmsTemplate != null) { - jmsTemplate.convertAndSend(msg); - CachingConnectionFactory ccf = (CachingConnectionFactory) this.jmsTemplate.getConnectionFactory(); - if (ccf != null) { - ccf.destroy(); - } - } + sendMessageToDefaultDestination(finalJson.toString()); } } diff --git a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java index 6f3e8883..127cf538 100644 --- a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java +++ b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java @@ -39,14 +39,18 @@ import org.onap.aai.kafka.AAIKafkaEventJMSProducer; import org.onap.aai.kafka.MessageProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.core.env.Environment; +import org.springframework.jms.core.JmsTemplate; public class StoreNotificationEvent { private static final Logger LOGGER = LoggerFactory.getLogger(StoreNotificationEvent.class); - private MessageProducer messageProducer; + @Autowired JmsTemplate jmsTemplate; + + private final MessageProducer messageProducer; private String fromAppId = ""; private String transId = ""; private final String transactionId; @@ -59,12 +63,12 @@ public class StoreNotificationEvent { * Instantiates a new store notification event. */ public StoreNotificationEvent(String transactionId, String sourceOfTruth) { - this.messageProducer = new AAIKafkaEventJMSProducer(); + this.messageProducer = new AAIKafkaEventJMSProducer(jmsTemplate); this.transactionId = transactionId; this.sourceOfTruth = sourceOfTruth; } - public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) { + public StoreNotificationEvent(MessageProducer producer, String transactionId, String sourceOfTruth) { this.messageProducer = producer; this.transactionId = transactionId; this.sourceOfTruth = sourceOfTruth; diff --git a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java index 6fbc297b..b2821b04 100644 --- a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java +++ b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java @@ -20,7 +20,6 @@ package org.onap.aai.util.delta; -import com.google.gson.*; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -28,38 +27,35 @@ import java.util.Date; import java.util.Map; import org.onap.aai.db.props.AAIProperties; -import org.onap.aai.kafka.AAIKafkaEventJMSProducer; import org.onap.aai.kafka.MessageProducer; import org.onap.aai.util.AAIConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; -public class DeltaEvents { +import com.google.gson.FieldNamingPolicy; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; - private static final Logger LOGGER = LoggerFactory.getLogger(DeltaEvents.class); +public class DeltaEvents { private static final Gson gson = new GsonBuilder().setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES).create(); + private static final String eventVersion = "v1"; - private String transId; - private String sourceName; - private String eventVersion = "v1"; - private String schemaVersion; - private Map<String, ObjectDelta> objectDeltas; + private final String transId; + private final String sourceName; + private final String schemaVersion; + private final Map<String, ObjectDelta> objectDeltas; - private MessageProducer messageProducer; + @Autowired private MessageProducer messageProducer; public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas) { - this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer()); - } - - public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas, - MessageProducer messageProducer) { - this.transId = transId; - this.sourceName = sourceName; - this.schemaVersion = schemaVersion; - this.objectDeltas = objectDeltas; - this.messageProducer = messageProducer; + this.transId = transId; + this.sourceName = sourceName; + this.schemaVersion = schemaVersion; + this.objectDeltas = objectDeltas; } public boolean triggerEvents() { @@ -98,7 +94,7 @@ public class DeltaEvents { header.addProperty("source-name", this.sourceName); header.addProperty("domain", this.getDomain()); header.addProperty("event-type", this.getEventType()); - header.addProperty("event-version", this.eventVersion); + header.addProperty("event-version", eventVersion); header.addProperty("schema-version", this.schemaVersion); header.addProperty("action", first.getAction().toString()); header.addProperty("entity-type", this.getEntityType(first)); @@ -126,7 +122,7 @@ public class DeltaEvents { /** * Given Long timestamp convert to format YYYYMMdd-HH:mm:ss:SSS - * + * * @param timestamp milliseconds since epoc * @return long timestamp in format YYYYMMdd-HH:mm:ss:SSS */ diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java index 71ae5b6b..05e4768d 100644 --- a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java +++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java @@ -18,158 +18,167 @@ * ============LICENSE_END========================================================= */ - package org.onap.aai.web; - - import java.util.HashMap; - import java.util.Map; - - import javax.annotation.PostConstruct; - - import org.apache.activemq.ActiveMQConnectionFactory; - import org.apache.activemq.broker.BrokerService; - import org.apache.activemq.command.ActiveMQQueue; - import org.apache.kafka.clients.producer.ProducerConfig; +package org.onap.aai.web; + +import java.util.HashMap; +import java.util.Map; + +import javax.annotation.PostConstruct; +import javax.jms.ConnectionFactory; +import javax.jms.MessageListener; +import javax.jms.Queue; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.onap.aai.introspection.LoaderFactory; import org.onap.aai.kafka.AAIKafkaEventJMSConsumer; import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.onap.aai.kafka.MessageProducer; +import org.onap.aai.rest.notification.NotificationService; import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - import org.springframework.beans.factory.annotation.Autowired; - import org.springframework.beans.factory.annotation.Value; - import org.springframework.context.ApplicationContext; - import org.springframework.context.annotation.Bean; - import org.springframework.context.annotation.Configuration; - import org.springframework.context.annotation.Profile; - import org.springframework.jms.connection.CachingConnectionFactory; - import org.springframework.jms.core.JmsTemplate; - import org.springframework.jms.listener.DefaultMessageListenerContainer; - import org.springframework.kafka.core.DefaultKafkaProducerFactory; - import org.springframework.kafka.core.KafkaTemplate; - import org.springframework.kafka.core.ProducerFactory; - - @Profile("kafka") - @Configuration - public class KafkaConfig { - - @Autowired - private ApplicationContext ctx; - - - @Value("${jms.bind.address}") - private String bindAddress; - - @Value("${spring.kafka.producer.bootstrap-servers}") - private String bootstrapServers; - - @Value("${spring.kafka.producer.properties.security.protocol}") - private String securityProtocol; - - @Value("${spring.kafka.producer.properties.sasl.mechanism}") - private String saslMechanism; - - @Value("${spring.kafka.producer.properties.sasl.jaas.config}") - private String saslJaasConfig; - - @Value("${spring.kafka.producer.retries}") - private Integer retries; - - private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); - - @PostConstruct - public void init() { - System.setProperty("activemq.tcp.url", bindAddress); - } - - @Bean(destroyMethod = "stop") - public BrokerService brokerService() throws Exception { - - BrokerService broker = new BrokerService(); - broker.addConnector(bindAddress); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.setSchedulerSupport(false); - broker.start(); - - return broker; - } - - @Bean(name = "connectionFactory") - public ActiveMQConnectionFactory activeMQConnectionFactory() { - return new ActiveMQConnectionFactory(bindAddress); - } - - @Bean - public CachingConnectionFactory cachingConnectionFactory() { - return new CachingConnectionFactory(activeMQConnectionFactory()); - } - - @Bean(name = "destinationQueue") - public ActiveMQQueue activeMQQueue() { - return new ActiveMQQueue("IN_QUEUE"); - } - - @Bean - public JmsTemplate jmsTemplate() { - JmsTemplate jmsTemplate = new JmsTemplate(); - - jmsTemplate.setConnectionFactory(activeMQConnectionFactory()); - jmsTemplate.setDefaultDestination(activeMQQueue()); - - return jmsTemplate; - } - - @Bean - public AAIKafkaEventJMSProducer jmsProducer() { - return new AAIKafkaEventJMSProducer(); - } - - @Bean(name = "jmsConsumer") - public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception { - return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate()); - } - - @Bean - public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception { - - DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); - - messageListenerContainer.setConnectionFactory(cachingConnectionFactory()); - messageListenerContainer.setDestinationName("IN_QUEUE"); - messageListenerContainer.setMessageListener(jmsConsumer()); - - return messageListenerContainer; - } - - @Bean - public ProducerFactory<String, String> producerFactory() throws Exception { - Map<String, Object> props = new HashMap<>(); - if(bootstrapServers == null){ - logger.error("Environment Variable " + bootstrapServers + " is missing"); - throw new Exception("Environment Variable " + bootstrapServers + " is missing"); - } - else{ - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - } - if(saslJaasConfig == null){ - logger.info("Not using any authentication for kafka interaction"); - } - else{ - logger.info("Using authentication provided by kafka interaction"); - // Strimzi Kafka security properties - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("security.protocol", securityProtocol); - props.put("sasl.mechanism", saslMechanism); - props.put("sasl.jaas.config", saslJaasConfig); - props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries)); - props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5"); - } - - return new DefaultKafkaProducerFactory<>(props); - } - - @Bean - public KafkaTemplate<String, String> kafkaTemplate() throws Exception { - return new KafkaTemplate<>(producerFactory()); - } - } -
\ No newline at end of file +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.listener.DefaultMessageListenerContainer; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +@Profile("kafka") +@Configuration +public class KafkaConfig { + + @Value("${jms.bind.address}") + private String bindAddress; + + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.producer.properties.security.protocol}") + private String securityProtocol; + + @Value("${spring.kafka.producer.properties.sasl.mechanism}") + private String saslMechanism; + + @Value("${spring.kafka.producer.properties.sasl.jaas.config}") + private String saslJaasConfig; + + @Value("${spring.kafka.producer.retries}") + private String retries; + + private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + + @PostConstruct + public void init() { + System.setProperty("activemq.tcp.url", bindAddress); + } + + @Bean(destroyMethod = "stop") + public BrokerService brokerService() throws Exception { + + BrokerService broker = new BrokerService(); + broker.addConnector(bindAddress); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setSchedulerSupport(false); + broker.start(); + + return broker; + } + + @ConditionalOnMissingBean + @Bean(name = "connectionFactory") + public ConnectionFactory activeMQConnectionFactory() { + return new ActiveMQConnectionFactory(bindAddress); + } + + @Bean + @ConditionalOnMissingBean + public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory targetConnectionFactory) { + return new CachingConnectionFactory(targetConnectionFactory); + } + + @Bean(name = "destinationQueue") + public Queue activeMQQueue() { + return new ActiveMQQueue("IN_QUEUE"); + } + + @Bean + public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, Queue queue) { + JmsTemplate jmsTemplate = new JmsTemplate(); + + jmsTemplate.setConnectionFactory(connectionFactory); + jmsTemplate.setDefaultDestination(queue); + + return jmsTemplate; + } + + @Bean(name = "jmsConsumer") + public MessageListener jmsConsumer(KafkaTemplate<String, String> kafkaTemplate) throws Exception { + return new AAIKafkaEventJMSConsumer(kafkaTemplate); + } + + @Bean + public DefaultMessageListenerContainer defaultMessageListenerContainer(ConnectionFactory connectionFactory, MessageListener messageListener) + throws Exception { + + DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); + + messageListenerContainer.setConnectionFactory(connectionFactory); + messageListenerContainer.setDestinationName("IN_QUEUE"); + messageListenerContainer.setMessageListener(messageListener); + + return messageListenerContainer; + } + + @Bean + public ProducerFactory<String, String> producerFactory() throws Exception { + Map<String, Object> props = new HashMap<>(); + if (bootstrapServers == null) { + logger.error("Environment Variable " + bootstrapServers + " is missing"); + throw new Exception("Environment Variable " + bootstrapServers + " is missing"); + } else { + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.RETRIES_CONFIG, retries); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); + + if (saslJaasConfig == null) { + logger.info("Not using any authentication for kafka interaction"); + } else { + logger.info("Using authentication provided by kafka interaction"); + // Strimzi Kafka security properties + props.put("security.protocol", securityProtocol); + props.put("sasl.mechanism", saslMechanism); + props.put("sasl.jaas.config", saslJaasConfig); + } + + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) throws Exception { + return new KafkaTemplate<>(producerFactory); + } + + @Bean + public MessageProducer messageProducer(JmsTemplate jmsTemplate) { + return new AAIKafkaEventJMSProducer(jmsTemplate); + } + + @Bean + public NotificationService notificationService(LoaderFactory loaderFactory, + @Value("${schema.uri.base.path}") String basePath, + @Value("${delta.events.enabled:false}") boolean isDeltaEventsEnabled) { + return new NotificationService(loaderFactory, basePath, isDeltaEventsEnabled); + } +} |