diff options
author | Singh <soumya.e.singh@accenture.com> | 2024-04-09 17:38:07 +0530 |
---|---|---|
committer | Singh <soumya.e.singh@accenture.com> | 2024-04-10 15:36:25 +0530 |
commit | dff80ed76c8e0e6416e0688541f3094db3ca260a (patch) | |
tree | 72b5210b43ddc9d41c4aa7e4fcb8cfef040db067 /aai-core/src/main/java | |
parent | dd7e9878066b0de0d8c0acddf58aec5702e83115 (diff) |
Remove DMaaP dependency from AAI-Common
- Remove Dmaap dependency in AAI-Common and replace it with Kafka.
Issue-ID: AAI-3792
Change-Id: If3fd5c3bdc2448f7e260a26000b02a510c80d7fb
Signed-off-by: Singh <soumya.e.singh@accenture.com>
Diffstat (limited to 'aai-core/src/main/java')
-rw-r--r-- | aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java | 146 | ||||
-rw-r--r-- | aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java | 135 | ||||
-rw-r--r-- | aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java (renamed from aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java) | 6 | ||||
-rw-r--r-- | aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java (renamed from aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java) | 2 | ||||
-rw-r--r-- | aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java | 16 | ||||
-rw-r--r-- | aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java | 6 | ||||
-rw-r--r-- | aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java | 125 | ||||
-rw-r--r-- | aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java | 95 | ||||
-rw-r--r-- | aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java | 175 |
9 files changed, 325 insertions, 381 deletions
diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java deleted file mode 100644 index d3addebb..00000000 --- a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Modifications Copyright © 2018 IBM. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.dmaap; - -import java.util.Map; -import java.util.Objects; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.TextMessage; - -import org.json.JSONException; -import org.json.JSONObject; -import org.onap.aai.aailog.logs.AaiDmaapMetricLog; -import org.onap.aai.exceptions.AAIException; -import org.onap.aai.logging.AaiElsErrorCode; -import org.onap.aai.logging.ErrorLogHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import org.springframework.core.env.Environment; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.web.client.RestTemplate; - -public class AAIDmaapEventJMSConsumer implements MessageListener { - - private static final String EVENT_TOPIC = "event-topic"; - - private static final Logger LOGGER = LoggerFactory.getLogger(AAIDmaapEventJMSConsumer.class); - - private RestTemplate restTemplate; - - private HttpHeaders httpHeaders; - - private Environment environment; - private Map<String, String> mdcCopy; - - public AAIDmaapEventJMSConsumer(Environment environment, RestTemplate restTemplate, HttpHeaders httpHeaders) { - super(); - mdcCopy = MDC.getCopyOfContextMap(); - Objects.nonNull(environment); - Objects.nonNull(restTemplate); - Objects.nonNull(httpHeaders); - this.environment = environment; - this.restTemplate = restTemplate; - this.httpHeaders = httpHeaders; - } - - @Override - public void onMessage(Message message) { - - if (restTemplate == null) { - return; - } - - String jsmMessageTxt = ""; - String aaiEvent = ""; - JSONObject aaiEventHeader; - JSONObject joPayload; - String transactionId = ""; - String serviceName = ""; - String eventName = ""; - String aaiElsErrorCode = AaiElsErrorCode.SUCCESS; - String errorDescription = ""; - - if (mdcCopy != null) { - MDC.setContextMap(mdcCopy); - } - - if (message instanceof TextMessage) { - AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog(); - try { - jsmMessageTxt = ((TextMessage) message).getText(); - JSONObject jo = new JSONObject(jsmMessageTxt); - if (jo.has("aaiEventPayload")) { - joPayload = jo.getJSONObject("aaiEventPayload"); - aaiEvent = joPayload.toString(); - } else { - return; - } - if (jo.getString(EVENT_TOPIC) != null) { - eventName = jo.getString(EVENT_TOPIC); - } - if (joPayload.has("event-header")) { - try { - aaiEventHeader = joPayload.getJSONObject("event-header"); - if (aaiEventHeader.has("id")) { - transactionId = aaiEventHeader.get("id").toString(); - } - if (aaiEventHeader.has("entity-link")) { - serviceName = aaiEventHeader.get("entity-link").toString(); - } - } catch (JSONException jexc) { - // ignore, this is just used for logging - } - } - metricLog.pre(eventName, aaiEvent, transactionId, serviceName); - - HttpEntity<String> httpEntity = new HttpEntity<String>(aaiEvent, httpHeaders); - - String transportType = environment.getProperty("dmaap.ribbon.transportType", "http"); - String baseUrl = transportType + "://" + environment.getProperty("dmaap.ribbon.listOfServers"); - String endpoint = "/events/" + eventName; - - if ("AAI-EVENT".equals(eventName)) { - restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class); - } else { - LOGGER.error(String.format("%s|Event Topic invalid.", eventName)); - } - } catch (JMSException | JSONException e) { - aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR; - errorDescription = e.getMessage(); - ErrorLogHelper.logException(new AAIException("AAI_7350")); - } catch (Exception e) { - aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR; - errorDescription = e.getMessage(); - ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt)); - } finally { - metricLog.post(aaiElsErrorCode, errorDescription); - } - } - } -} diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java new file mode 100644 index 00000000..731f3dfc --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java @@ -0,0 +1,135 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Modifications Copyright © 2018 IBM. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + package org.onap.aai.kafka; + + import java.util.Map; + import java.util.Objects; + + import javax.jms.JMSException; + import javax.jms.Message; + import javax.jms.MessageListener; + import javax.jms.TextMessage; + + import org.json.JSONException; + import org.json.JSONObject; + import org.onap.aai.aailog.logs.AaiDmaapMetricLog; + import org.onap.aai.exceptions.AAIException; + import org.onap.aai.logging.AaiElsErrorCode; + import org.onap.aai.logging.ErrorLogHelper; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.slf4j.MDC; + import org.springframework.core.env.Environment; + import org.springframework.kafka.core.KafkaTemplate; + + public class AAIKafkaEventJMSConsumer implements MessageListener { + + private static final String EVENT_TOPIC = "event-topic"; + + private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class); + + private Environment environment; + private Map<String, String> mdcCopy; + private KafkaTemplate<String,String> kafkaTemplate; + + public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate<String,String> kafkaTemplate) { + super(); + mdcCopy = MDC.getCopyOfContextMap(); + Objects.nonNull(environment); + this.environment = environment; + this.kafkaTemplate=kafkaTemplate; + } + + @Override + public void onMessage(Message message) { + + if (kafkaTemplate == null) { + return; + } + + String jsmMessageTxt = ""; + String aaiEvent = ""; + JSONObject aaiEventHeader; + JSONObject joPayload; + String transactionId = ""; + String serviceName = ""; + String eventName = ""; + String aaiElsErrorCode = AaiElsErrorCode.SUCCESS; + String errorDescription = ""; + + if (mdcCopy != null) { + MDC.setContextMap(mdcCopy); + } + + if (message instanceof TextMessage) { + AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog(); + try { + jsmMessageTxt = ((TextMessage) message).getText(); + JSONObject jo = new JSONObject(jsmMessageTxt); + if (jo.has("aaiEventPayload")) { + joPayload = jo.getJSONObject("aaiEventPayload"); + aaiEvent = joPayload.toString(); + } else { + return; + } + if (jo.getString(EVENT_TOPIC) != null) { + eventName = jo.getString(EVENT_TOPIC); + } + if (joPayload.has("event-header")) { + try { + aaiEventHeader = joPayload.getJSONObject("event-header"); + if (aaiEventHeader.has("id")) { + transactionId = aaiEventHeader.get("id").toString(); + } + if (aaiEventHeader.has("entity-link")) { + serviceName = aaiEventHeader.get("entity-link").toString(); + } + } catch (JSONException jexc) { + // ignore, this is just used for logging + } + } + metricLog.pre(eventName, aaiEvent, transactionId, serviceName); + + + if ("AAI-EVENT".equals(eventName)) { + // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class); + kafkaTemplate.send(eventName,aaiEvent); + + } else { + LOGGER.error(String.format("%s|Event Topic invalid.", eventName)); + } + } catch (JMSException | JSONException e) { + aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR; + errorDescription = e.getMessage(); + ErrorLogHelper.logException(new AAIException("AAI_7350")); + } catch (Exception e) { + aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR; + errorDescription = e.getMessage(); + ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt)); + } finally { + metricLog.post(aaiElsErrorCode, errorDescription); + } + } + } + } +
\ No newline at end of file diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java index 3d675efe..00cf677f 100644 --- a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java @@ -20,7 +20,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.aai.dmaap; +package org.onap.aai.kafka; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; @@ -29,11 +29,11 @@ import org.onap.aai.util.AAIConfig; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; -public class AAIDmaapEventJMSProducer implements MessageProducer { +public class AAIKafkaEventJMSProducer implements MessageProducer { private JmsTemplate jmsTemplate; - public AAIDmaapEventJMSProducer() { + public AAIKafkaEventJMSProducer() { if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) { this.jmsTemplate = new JmsTemplate(); String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547"); diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java index da70d505..d6a491ef 100644 --- a/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java +++ b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.aai.dmaap; +package org.onap.aai.kafka; import org.json.JSONObject; diff --git a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java index 598ef231..6f3e8883 100644 --- a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java +++ b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java @@ -30,13 +30,13 @@ import org.eclipse.persistence.dynamic.DynamicEntity; import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; import org.json.JSONException; import org.json.JSONObject; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; -import org.onap.aai.dmaap.MessageProducer; import org.onap.aai.domain.notificationEvent.NotificationEvent; import org.onap.aai.exceptions.AAIException; import org.onap.aai.introspection.Introspector; import org.onap.aai.introspection.Loader; import org.onap.aai.introspection.exceptions.AAIUnknownObjectException; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.onap.aai.kafka.MessageProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; @@ -59,12 +59,12 @@ public class StoreNotificationEvent { * Instantiates a new store notification event. */ public StoreNotificationEvent(String transactionId, String sourceOfTruth) { - this.messageProducer = new AAIDmaapEventJMSProducer(); + this.messageProducer = new AAIKafkaEventJMSProducer(); this.transactionId = transactionId; this.sourceOfTruth = sourceOfTruth; } - public StoreNotificationEvent(AAIDmaapEventJMSProducer producer, String transactionId, String sourceOfTruth) { + public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) { this.messageProducer = producer; this.transactionId = transactionId; this.sourceOfTruth = sourceOfTruth; @@ -139,7 +139,7 @@ public class StoreNotificationEvent { try { PojoUtils pu = new PojoUtils(); String entityJson = pu.getJsonFromObject(ne); - sendToDmaapJmsQueue(entityJson); + sendToKafkaJmsQueue(entityJson); return entityJson; } catch (Exception e) { throw new AAIException("AAI_7350", e); @@ -227,7 +227,7 @@ public class StoreNotificationEvent { marshaller.setProperty(org.eclipse.persistence.jaxb.MarshallerProperties.JSON_WRAPPER_AS_ARRAY_NAME, false); marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, false); marshaller.marshal(notificationEvent, result); - this.sendToDmaapJmsQueue(result.toString()); + this.sendToKafkaJmsQueue(result.toString()); } catch (Exception e) { throw new AAIException("AAI_7350", e); @@ -380,7 +380,7 @@ public class StoreNotificationEvent { notificationEvent.setValue("entity", obj.getUnderlyingObject()); String entityJson = notificationEvent.marshal(false); - sendToDmaapJmsQueue(entityJson); + sendToKafkaJmsQueue(entityJson); return entityJson; } catch (JSONException e) { throw new AAIException("AAI_7350", e); @@ -389,7 +389,7 @@ public class StoreNotificationEvent { } } - private void sendToDmaapJmsQueue(String entityString) throws JSONException { + private void sendToKafkaJmsQueue(String entityString) throws JSONException { JSONObject entityJsonObject = new JSONObject(entityString); diff --git a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java index 10d47cd6..6fbc297b 100644 --- a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java +++ b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java @@ -28,8 +28,8 @@ import java.util.Date; import java.util.Map; import org.onap.aai.db.props.AAIProperties; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; -import org.onap.aai.dmaap.MessageProducer; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.onap.aai.kafka.MessageProducer; import org.onap.aai.util.AAIConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +50,7 @@ public class DeltaEvents { private MessageProducer messageProducer; public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas) { - this(transId, sourceName, schemaVersion, objectDeltas, new AAIDmaapEventJMSProducer()); + this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer()); } public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas, diff --git a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java b/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java deleted file mode 100644 index 078ca963..00000000 --- a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.web; - -import javax.annotation.PostConstruct; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.onap.aai.dmaap.AAIDmaapEventJMSConsumer; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; -import org.springframework.http.HttpHeaders; -import org.springframework.jms.connection.CachingConnectionFactory; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.listener.DefaultMessageListenerContainer; -import org.springframework.web.client.RestTemplate; - -@Profile("dmaap") -@Configuration -public class DmaapConfig { - - @Autowired - private ApplicationContext ctx; - - @Autowired - @Qualifier("dmaapRestTemplate") - private RestTemplate dmaapRestTemplate; - - @Autowired - @Qualifier("dmaapHeaders") - private HttpHeaders dmaapHeaders; - - @Value("${jms.bind.address}") - private String bindAddress; - - @PostConstruct - public void init() { - System.setProperty("activemq.tcp.url", bindAddress); - } - - @Bean(destroyMethod = "stop") - public BrokerService brokerService() throws Exception { - - BrokerService broker = new BrokerService(); - broker.addConnector(bindAddress); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.setSchedulerSupport(false); - broker.start(); - - return broker; - } - - @Bean(name = "connectionFactory") - public ActiveMQConnectionFactory activeMQConnectionFactory() { - return new ActiveMQConnectionFactory(bindAddress); - } - - @Bean - public CachingConnectionFactory cachingConnectionFactory() { - return new CachingConnectionFactory(activeMQConnectionFactory()); - } - - @Bean(name = "destinationQueue") - public ActiveMQQueue activeMQQueue() { - return new ActiveMQQueue("IN_QUEUE"); - } - - @Bean - public JmsTemplate jmsTemplate() { - JmsTemplate jmsTemplate = new JmsTemplate(); - - jmsTemplate.setConnectionFactory(activeMQConnectionFactory()); - jmsTemplate.setDefaultDestination(activeMQQueue()); - - return jmsTemplate; - } - - @Bean - public AAIDmaapEventJMSProducer jmsProducer() { - return new AAIDmaapEventJMSProducer(); - } - - @Bean(name = "jmsConsumer") - public AAIDmaapEventJMSConsumer jmsConsumer() throws Exception { - return new AAIDmaapEventJMSConsumer(ctx.getEnvironment(), dmaapRestTemplate, dmaapHeaders); - } - - @Bean - public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception { - - DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); - - messageListenerContainer.setConnectionFactory(cachingConnectionFactory()); - messageListenerContainer.setDestinationName("IN_QUEUE"); - messageListenerContainer.setMessageListener(jmsConsumer()); - - return messageListenerContainer; - } -} diff --git a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java b/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java deleted file mode 100644 index cbd9b105..00000000 --- a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.web; - -import java.io.UnsupportedEncodingException; -import java.util.Base64; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.web.client.RestTemplate; - -@Configuration -public class EventClientPublisher { - - private static final Logger LOGGER = LoggerFactory.getLogger(EventClientPublisher.class); - - @Value("${dmaap.ribbon.listOfServers:}") - private String hosts; - - @Value("${dmaap.ribbon.username:}") - private String username; - - @Value("${dmaap.ribbon.password:}") - private String password; - - @Value("${dmaap.ribbon.topic:AAI-EVENT}") - private String topic; - - @Value("${dmaap.ribbon.batchSize:100}") - private int maxBatchSize; - - @Value("${dmaap.ribbon.maxAgeMs:250}") - private int maxAgeMs; - - @Value("${dmaap.ribbon.delayBetweenBatches:100}") - private int delayBetweenBatches; - - @Value("${dmaap.ribbon.protocol:http}") - private String protocol; - - @Value("${dmaap.ribbon.transportType:HTTPNOAUTH}") - private String tranportType; - - @Value("${dmaap.ribbon.contentType:application/json}") - private String contentType; - - @Bean(name = "dmaapRestTemplate") - public RestTemplate dmaapRestTemplate() { - return new RestTemplate(); - } - - @Bean(name = "dmaapHeaders") - public HttpHeaders dmaapHeaders() throws UnsupportedEncodingException { - - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.setContentType(MediaType.APPLICATION_JSON); - - if (username != null && password != null) { - - if (!StringUtils.EMPTY.equals(username) && !StringUtils.EMPTY.equals(password)) { - - byte[] userPass = (username + ":" + password).getBytes("UTF-8"); - - httpHeaders.set("Authorization", "Basic " + Base64.getEncoder().encodeToString(userPass)); - } - } - - return httpHeaders; - } - -} diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java new file mode 100644 index 00000000..71ae5b6b --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java @@ -0,0 +1,175 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + package org.onap.aai.web; + + import java.util.HashMap; + import java.util.Map; + + import javax.annotation.PostConstruct; + + import org.apache.activemq.ActiveMQConnectionFactory; + import org.apache.activemq.broker.BrokerService; + import org.apache.activemq.command.ActiveMQQueue; + import org.apache.kafka.clients.producer.ProducerConfig; +import org.onap.aai.kafka.AAIKafkaEventJMSConsumer; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.springframework.beans.factory.annotation.Autowired; + import org.springframework.beans.factory.annotation.Value; + import org.springframework.context.ApplicationContext; + import org.springframework.context.annotation.Bean; + import org.springframework.context.annotation.Configuration; + import org.springframework.context.annotation.Profile; + import org.springframework.jms.connection.CachingConnectionFactory; + import org.springframework.jms.core.JmsTemplate; + import org.springframework.jms.listener.DefaultMessageListenerContainer; + import org.springframework.kafka.core.DefaultKafkaProducerFactory; + import org.springframework.kafka.core.KafkaTemplate; + import org.springframework.kafka.core.ProducerFactory; + + @Profile("kafka") + @Configuration + public class KafkaConfig { + + @Autowired + private ApplicationContext ctx; + + + @Value("${jms.bind.address}") + private String bindAddress; + + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.producer.properties.security.protocol}") + private String securityProtocol; + + @Value("${spring.kafka.producer.properties.sasl.mechanism}") + private String saslMechanism; + + @Value("${spring.kafka.producer.properties.sasl.jaas.config}") + private String saslJaasConfig; + + @Value("${spring.kafka.producer.retries}") + private Integer retries; + + private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + + @PostConstruct + public void init() { + System.setProperty("activemq.tcp.url", bindAddress); + } + + @Bean(destroyMethod = "stop") + public BrokerService brokerService() throws Exception { + + BrokerService broker = new BrokerService(); + broker.addConnector(bindAddress); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setSchedulerSupport(false); + broker.start(); + + return broker; + } + + @Bean(name = "connectionFactory") + public ActiveMQConnectionFactory activeMQConnectionFactory() { + return new ActiveMQConnectionFactory(bindAddress); + } + + @Bean + public CachingConnectionFactory cachingConnectionFactory() { + return new CachingConnectionFactory(activeMQConnectionFactory()); + } + + @Bean(name = "destinationQueue") + public ActiveMQQueue activeMQQueue() { + return new ActiveMQQueue("IN_QUEUE"); + } + + @Bean + public JmsTemplate jmsTemplate() { + JmsTemplate jmsTemplate = new JmsTemplate(); + + jmsTemplate.setConnectionFactory(activeMQConnectionFactory()); + jmsTemplate.setDefaultDestination(activeMQQueue()); + + return jmsTemplate; + } + + @Bean + public AAIKafkaEventJMSProducer jmsProducer() { + return new AAIKafkaEventJMSProducer(); + } + + @Bean(name = "jmsConsumer") + public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception { + return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate()); + } + + @Bean + public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception { + + DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); + + messageListenerContainer.setConnectionFactory(cachingConnectionFactory()); + messageListenerContainer.setDestinationName("IN_QUEUE"); + messageListenerContainer.setMessageListener(jmsConsumer()); + + return messageListenerContainer; + } + + @Bean + public ProducerFactory<String, String> producerFactory() throws Exception { + Map<String, Object> props = new HashMap<>(); + if(bootstrapServers == null){ + logger.error("Environment Variable " + bootstrapServers + " is missing"); + throw new Exception("Environment Variable " + bootstrapServers + " is missing"); + } + else{ + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + if(saslJaasConfig == null){ + logger.info("Not using any authentication for kafka interaction"); + } + else{ + logger.info("Using authentication provided by kafka interaction"); + // Strimzi Kafka security properties + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("security.protocol", securityProtocol); + props.put("sasl.mechanism", saslMechanism); + props.put("sasl.jaas.config", saslJaasConfig); + props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries)); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5"); + } + + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate<String, String> kafkaTemplate() throws Exception { + return new KafkaTemplate<>(producerFactory()); + } + } +
\ No newline at end of file |