aboutsummaryrefslogtreecommitdiffstats
path: root/aai-core/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'aai-core/src/main/java')
-rw-r--r--aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java146
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java135
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java (renamed from aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java)6
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java (renamed from aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java)2
-rw-r--r--aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java16
-rw-r--r--aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java6
-rw-r--r--aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java125
-rw-r--r--aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java95
-rw-r--r--aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java175
9 files changed, 325 insertions, 381 deletions
diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java
deleted file mode 100644
index d3addebb..00000000
--- a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Modifications Copyright © 2018 IBM.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.aai.dmaap;
-
-import java.util.Map;
-import java.util.Objects;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
-import org.onap.aai.exceptions.AAIException;
-import org.onap.aai.logging.AaiElsErrorCode;
-import org.onap.aai.logging.ErrorLogHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.springframework.core.env.Environment;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.web.client.RestTemplate;
-
-public class AAIDmaapEventJMSConsumer implements MessageListener {
-
- private static final String EVENT_TOPIC = "event-topic";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AAIDmaapEventJMSConsumer.class);
-
- private RestTemplate restTemplate;
-
- private HttpHeaders httpHeaders;
-
- private Environment environment;
- private Map<String, String> mdcCopy;
-
- public AAIDmaapEventJMSConsumer(Environment environment, RestTemplate restTemplate, HttpHeaders httpHeaders) {
- super();
- mdcCopy = MDC.getCopyOfContextMap();
- Objects.nonNull(environment);
- Objects.nonNull(restTemplate);
- Objects.nonNull(httpHeaders);
- this.environment = environment;
- this.restTemplate = restTemplate;
- this.httpHeaders = httpHeaders;
- }
-
- @Override
- public void onMessage(Message message) {
-
- if (restTemplate == null) {
- return;
- }
-
- String jsmMessageTxt = "";
- String aaiEvent = "";
- JSONObject aaiEventHeader;
- JSONObject joPayload;
- String transactionId = "";
- String serviceName = "";
- String eventName = "";
- String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
- String errorDescription = "";
-
- if (mdcCopy != null) {
- MDC.setContextMap(mdcCopy);
- }
-
- if (message instanceof TextMessage) {
- AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
- try {
- jsmMessageTxt = ((TextMessage) message).getText();
- JSONObject jo = new JSONObject(jsmMessageTxt);
- if (jo.has("aaiEventPayload")) {
- joPayload = jo.getJSONObject("aaiEventPayload");
- aaiEvent = joPayload.toString();
- } else {
- return;
- }
- if (jo.getString(EVENT_TOPIC) != null) {
- eventName = jo.getString(EVENT_TOPIC);
- }
- if (joPayload.has("event-header")) {
- try {
- aaiEventHeader = joPayload.getJSONObject("event-header");
- if (aaiEventHeader.has("id")) {
- transactionId = aaiEventHeader.get("id").toString();
- }
- if (aaiEventHeader.has("entity-link")) {
- serviceName = aaiEventHeader.get("entity-link").toString();
- }
- } catch (JSONException jexc) {
- // ignore, this is just used for logging
- }
- }
- metricLog.pre(eventName, aaiEvent, transactionId, serviceName);
-
- HttpEntity<String> httpEntity = new HttpEntity<String>(aaiEvent, httpHeaders);
-
- String transportType = environment.getProperty("dmaap.ribbon.transportType", "http");
- String baseUrl = transportType + "://" + environment.getProperty("dmaap.ribbon.listOfServers");
- String endpoint = "/events/" + eventName;
-
- if ("AAI-EVENT".equals(eventName)) {
- restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class);
- } else {
- LOGGER.error(String.format("%s|Event Topic invalid.", eventName));
- }
- } catch (JMSException | JSONException e) {
- aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
- errorDescription = e.getMessage();
- ErrorLogHelper.logException(new AAIException("AAI_7350"));
- } catch (Exception e) {
- aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
- errorDescription = e.getMessage();
- ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt));
- } finally {
- metricLog.post(aaiElsErrorCode, errorDescription);
- }
- }
- }
-}
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
new file mode 100644
index 00000000..731f3dfc
--- /dev/null
+++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
@@ -0,0 +1,135 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+ package org.onap.aai.kafka;
+
+ import java.util.Map;
+ import java.util.Objects;
+
+ import javax.jms.JMSException;
+ import javax.jms.Message;
+ import javax.jms.MessageListener;
+ import javax.jms.TextMessage;
+
+ import org.json.JSONException;
+ import org.json.JSONObject;
+ import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
+ import org.onap.aai.exceptions.AAIException;
+ import org.onap.aai.logging.AaiElsErrorCode;
+ import org.onap.aai.logging.ErrorLogHelper;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.slf4j.MDC;
+ import org.springframework.core.env.Environment;
+ import org.springframework.kafka.core.KafkaTemplate;
+
+ public class AAIKafkaEventJMSConsumer implements MessageListener {
+
+ private static final String EVENT_TOPIC = "event-topic";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class);
+
+ private Environment environment;
+ private Map<String, String> mdcCopy;
+ private KafkaTemplate<String,String> kafkaTemplate;
+
+ public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate<String,String> kafkaTemplate) {
+ super();
+ mdcCopy = MDC.getCopyOfContextMap();
+ Objects.nonNull(environment);
+ this.environment = environment;
+ this.kafkaTemplate=kafkaTemplate;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+
+ if (kafkaTemplate == null) {
+ return;
+ }
+
+ String jsmMessageTxt = "";
+ String aaiEvent = "";
+ JSONObject aaiEventHeader;
+ JSONObject joPayload;
+ String transactionId = "";
+ String serviceName = "";
+ String eventName = "";
+ String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
+ String errorDescription = "";
+
+ if (mdcCopy != null) {
+ MDC.setContextMap(mdcCopy);
+ }
+
+ if (message instanceof TextMessage) {
+ AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
+ try {
+ jsmMessageTxt = ((TextMessage) message).getText();
+ JSONObject jo = new JSONObject(jsmMessageTxt);
+ if (jo.has("aaiEventPayload")) {
+ joPayload = jo.getJSONObject("aaiEventPayload");
+ aaiEvent = joPayload.toString();
+ } else {
+ return;
+ }
+ if (jo.getString(EVENT_TOPIC) != null) {
+ eventName = jo.getString(EVENT_TOPIC);
+ }
+ if (joPayload.has("event-header")) {
+ try {
+ aaiEventHeader = joPayload.getJSONObject("event-header");
+ if (aaiEventHeader.has("id")) {
+ transactionId = aaiEventHeader.get("id").toString();
+ }
+ if (aaiEventHeader.has("entity-link")) {
+ serviceName = aaiEventHeader.get("entity-link").toString();
+ }
+ } catch (JSONException jexc) {
+ // ignore, this is just used for logging
+ }
+ }
+ metricLog.pre(eventName, aaiEvent, transactionId, serviceName);
+
+
+ if ("AAI-EVENT".equals(eventName)) {
+ // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class);
+ kafkaTemplate.send(eventName,aaiEvent);
+
+ } else {
+ LOGGER.error(String.format("%s|Event Topic invalid.", eventName));
+ }
+ } catch (JMSException | JSONException e) {
+ aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
+ errorDescription = e.getMessage();
+ ErrorLogHelper.logException(new AAIException("AAI_7350"));
+ } catch (Exception e) {
+ aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
+ errorDescription = e.getMessage();
+ ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt));
+ } finally {
+ metricLog.post(aaiElsErrorCode, errorDescription);
+ }
+ }
+ }
+ }
+ \ No newline at end of file
diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
index 3d675efe..00cf677f 100644
--- a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java
+++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
@@ -20,7 +20,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.aai.dmaap;
+package org.onap.aai.kafka;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
@@ -29,11 +29,11 @@ import org.onap.aai.util.AAIConfig;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
-public class AAIDmaapEventJMSProducer implements MessageProducer {
+public class AAIKafkaEventJMSProducer implements MessageProducer {
private JmsTemplate jmsTemplate;
- public AAIDmaapEventJMSProducer() {
+ public AAIKafkaEventJMSProducer() {
if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) {
this.jmsTemplate = new JmsTemplate();
String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547");
diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java
index da70d505..d6a491ef 100644
--- a/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java
+++ b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.aai.dmaap;
+package org.onap.aai.kafka;
import org.json.JSONObject;
diff --git a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
index 598ef231..6f3e8883 100644
--- a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
+++ b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
@@ -30,13 +30,13 @@ import org.eclipse.persistence.dynamic.DynamicEntity;
import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
import org.json.JSONException;
import org.json.JSONObject;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
-import org.onap.aai.dmaap.MessageProducer;
import org.onap.aai.domain.notificationEvent.NotificationEvent;
import org.onap.aai.exceptions.AAIException;
import org.onap.aai.introspection.Introspector;
import org.onap.aai.introspection.Loader;
import org.onap.aai.introspection.exceptions.AAIUnknownObjectException;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.onap.aai.kafka.MessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
@@ -59,12 +59,12 @@ public class StoreNotificationEvent {
* Instantiates a new store notification event.
*/
public StoreNotificationEvent(String transactionId, String sourceOfTruth) {
- this.messageProducer = new AAIDmaapEventJMSProducer();
+ this.messageProducer = new AAIKafkaEventJMSProducer();
this.transactionId = transactionId;
this.sourceOfTruth = sourceOfTruth;
}
- public StoreNotificationEvent(AAIDmaapEventJMSProducer producer, String transactionId, String sourceOfTruth) {
+ public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) {
this.messageProducer = producer;
this.transactionId = transactionId;
this.sourceOfTruth = sourceOfTruth;
@@ -139,7 +139,7 @@ public class StoreNotificationEvent {
try {
PojoUtils pu = new PojoUtils();
String entityJson = pu.getJsonFromObject(ne);
- sendToDmaapJmsQueue(entityJson);
+ sendToKafkaJmsQueue(entityJson);
return entityJson;
} catch (Exception e) {
throw new AAIException("AAI_7350", e);
@@ -227,7 +227,7 @@ public class StoreNotificationEvent {
marshaller.setProperty(org.eclipse.persistence.jaxb.MarshallerProperties.JSON_WRAPPER_AS_ARRAY_NAME, false);
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, false);
marshaller.marshal(notificationEvent, result);
- this.sendToDmaapJmsQueue(result.toString());
+ this.sendToKafkaJmsQueue(result.toString());
} catch (Exception e) {
throw new AAIException("AAI_7350", e);
@@ -380,7 +380,7 @@ public class StoreNotificationEvent {
notificationEvent.setValue("entity", obj.getUnderlyingObject());
String entityJson = notificationEvent.marshal(false);
- sendToDmaapJmsQueue(entityJson);
+ sendToKafkaJmsQueue(entityJson);
return entityJson;
} catch (JSONException e) {
throw new AAIException("AAI_7350", e);
@@ -389,7 +389,7 @@ public class StoreNotificationEvent {
}
}
- private void sendToDmaapJmsQueue(String entityString) throws JSONException {
+ private void sendToKafkaJmsQueue(String entityString) throws JSONException {
JSONObject entityJsonObject = new JSONObject(entityString);
diff --git a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
index 10d47cd6..6fbc297b 100644
--- a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
+++ b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
@@ -28,8 +28,8 @@ import java.util.Date;
import java.util.Map;
import org.onap.aai.db.props.AAIProperties;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
-import org.onap.aai.dmaap.MessageProducer;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.onap.aai.kafka.MessageProducer;
import org.onap.aai.util.AAIConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +50,7 @@ public class DeltaEvents {
private MessageProducer messageProducer;
public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas) {
- this(transId, sourceName, schemaVersion, objectDeltas, new AAIDmaapEventJMSProducer());
+ this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer());
}
public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas,
diff --git a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java b/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java
deleted file mode 100644
index 078ca963..00000000
--- a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.aai.web;
-
-import javax.annotation.PostConstruct;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.onap.aai.dmaap.AAIDmaapEventJMSConsumer;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Profile;
-import org.springframework.http.HttpHeaders;
-import org.springframework.jms.connection.CachingConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
-import org.springframework.web.client.RestTemplate;
-
-@Profile("dmaap")
-@Configuration
-public class DmaapConfig {
-
- @Autowired
- private ApplicationContext ctx;
-
- @Autowired
- @Qualifier("dmaapRestTemplate")
- private RestTemplate dmaapRestTemplate;
-
- @Autowired
- @Qualifier("dmaapHeaders")
- private HttpHeaders dmaapHeaders;
-
- @Value("${jms.bind.address}")
- private String bindAddress;
-
- @PostConstruct
- public void init() {
- System.setProperty("activemq.tcp.url", bindAddress);
- }
-
- @Bean(destroyMethod = "stop")
- public BrokerService brokerService() throws Exception {
-
- BrokerService broker = new BrokerService();
- broker.addConnector(bindAddress);
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.setSchedulerSupport(false);
- broker.start();
-
- return broker;
- }
-
- @Bean(name = "connectionFactory")
- public ActiveMQConnectionFactory activeMQConnectionFactory() {
- return new ActiveMQConnectionFactory(bindAddress);
- }
-
- @Bean
- public CachingConnectionFactory cachingConnectionFactory() {
- return new CachingConnectionFactory(activeMQConnectionFactory());
- }
-
- @Bean(name = "destinationQueue")
- public ActiveMQQueue activeMQQueue() {
- return new ActiveMQQueue("IN_QUEUE");
- }
-
- @Bean
- public JmsTemplate jmsTemplate() {
- JmsTemplate jmsTemplate = new JmsTemplate();
-
- jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
- jmsTemplate.setDefaultDestination(activeMQQueue());
-
- return jmsTemplate;
- }
-
- @Bean
- public AAIDmaapEventJMSProducer jmsProducer() {
- return new AAIDmaapEventJMSProducer();
- }
-
- @Bean(name = "jmsConsumer")
- public AAIDmaapEventJMSConsumer jmsConsumer() throws Exception {
- return new AAIDmaapEventJMSConsumer(ctx.getEnvironment(), dmaapRestTemplate, dmaapHeaders);
- }
-
- @Bean
- public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
-
- DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
-
- messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
- messageListenerContainer.setDestinationName("IN_QUEUE");
- messageListenerContainer.setMessageListener(jmsConsumer());
-
- return messageListenerContainer;
- }
-}
diff --git a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java b/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java
deleted file mode 100644
index cbd9b105..00000000
--- a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.aai.web;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Base64;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.web.client.RestTemplate;
-
-@Configuration
-public class EventClientPublisher {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(EventClientPublisher.class);
-
- @Value("${dmaap.ribbon.listOfServers:}")
- private String hosts;
-
- @Value("${dmaap.ribbon.username:}")
- private String username;
-
- @Value("${dmaap.ribbon.password:}")
- private String password;
-
- @Value("${dmaap.ribbon.topic:AAI-EVENT}")
- private String topic;
-
- @Value("${dmaap.ribbon.batchSize:100}")
- private int maxBatchSize;
-
- @Value("${dmaap.ribbon.maxAgeMs:250}")
- private int maxAgeMs;
-
- @Value("${dmaap.ribbon.delayBetweenBatches:100}")
- private int delayBetweenBatches;
-
- @Value("${dmaap.ribbon.protocol:http}")
- private String protocol;
-
- @Value("${dmaap.ribbon.transportType:HTTPNOAUTH}")
- private String tranportType;
-
- @Value("${dmaap.ribbon.contentType:application/json}")
- private String contentType;
-
- @Bean(name = "dmaapRestTemplate")
- public RestTemplate dmaapRestTemplate() {
- return new RestTemplate();
- }
-
- @Bean(name = "dmaapHeaders")
- public HttpHeaders dmaapHeaders() throws UnsupportedEncodingException {
-
- HttpHeaders httpHeaders = new HttpHeaders();
- httpHeaders.setContentType(MediaType.APPLICATION_JSON);
-
- if (username != null && password != null) {
-
- if (!StringUtils.EMPTY.equals(username) && !StringUtils.EMPTY.equals(password)) {
-
- byte[] userPass = (username + ":" + password).getBytes("UTF-8");
-
- httpHeaders.set("Authorization", "Basic " + Base64.getEncoder().encodeToString(userPass));
- }
- }
-
- return httpHeaders;
- }
-
-}
diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
new file mode 100644
index 00000000..71ae5b6b
--- /dev/null
+++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
@@ -0,0 +1,175 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+ package org.onap.aai.web;
+
+ import java.util.HashMap;
+ import java.util.Map;
+
+ import javax.annotation.PostConstruct;
+
+ import org.apache.activemq.ActiveMQConnectionFactory;
+ import org.apache.activemq.broker.BrokerService;
+ import org.apache.activemq.command.ActiveMQQueue;
+ import org.apache.kafka.clients.producer.ProducerConfig;
+import org.onap.aai.kafka.AAIKafkaEventJMSConsumer;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.beans.factory.annotation.Autowired;
+ import org.springframework.beans.factory.annotation.Value;
+ import org.springframework.context.ApplicationContext;
+ import org.springframework.context.annotation.Bean;
+ import org.springframework.context.annotation.Configuration;
+ import org.springframework.context.annotation.Profile;
+ import org.springframework.jms.connection.CachingConnectionFactory;
+ import org.springframework.jms.core.JmsTemplate;
+ import org.springframework.jms.listener.DefaultMessageListenerContainer;
+ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+ import org.springframework.kafka.core.KafkaTemplate;
+ import org.springframework.kafka.core.ProducerFactory;
+
+ @Profile("kafka")
+ @Configuration
+ public class KafkaConfig {
+
+ @Autowired
+ private ApplicationContext ctx;
+
+
+ @Value("${jms.bind.address}")
+ private String bindAddress;
+
+ @Value("${spring.kafka.producer.bootstrap-servers}")
+ private String bootstrapServers;
+
+ @Value("${spring.kafka.producer.properties.security.protocol}")
+ private String securityProtocol;
+
+ @Value("${spring.kafka.producer.properties.sasl.mechanism}")
+ private String saslMechanism;
+
+ @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
+ private String saslJaasConfig;
+
+ @Value("${spring.kafka.producer.retries}")
+ private Integer retries;
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+
+ @PostConstruct
+ public void init() {
+ System.setProperty("activemq.tcp.url", bindAddress);
+ }
+
+ @Bean(destroyMethod = "stop")
+ public BrokerService brokerService() throws Exception {
+
+ BrokerService broker = new BrokerService();
+ broker.addConnector(bindAddress);
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.setSchedulerSupport(false);
+ broker.start();
+
+ return broker;
+ }
+
+ @Bean(name = "connectionFactory")
+ public ActiveMQConnectionFactory activeMQConnectionFactory() {
+ return new ActiveMQConnectionFactory(bindAddress);
+ }
+
+ @Bean
+ public CachingConnectionFactory cachingConnectionFactory() {
+ return new CachingConnectionFactory(activeMQConnectionFactory());
+ }
+
+ @Bean(name = "destinationQueue")
+ public ActiveMQQueue activeMQQueue() {
+ return new ActiveMQQueue("IN_QUEUE");
+ }
+
+ @Bean
+ public JmsTemplate jmsTemplate() {
+ JmsTemplate jmsTemplate = new JmsTemplate();
+
+ jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
+ jmsTemplate.setDefaultDestination(activeMQQueue());
+
+ return jmsTemplate;
+ }
+
+ @Bean
+ public AAIKafkaEventJMSProducer jmsProducer() {
+ return new AAIKafkaEventJMSProducer();
+ }
+
+ @Bean(name = "jmsConsumer")
+ public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception {
+ return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate());
+ }
+
+ @Bean
+ public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
+
+ DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
+
+ messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
+ messageListenerContainer.setDestinationName("IN_QUEUE");
+ messageListenerContainer.setMessageListener(jmsConsumer());
+
+ return messageListenerContainer;
+ }
+
+ @Bean
+ public ProducerFactory<String, String> producerFactory() throws Exception {
+ Map<String, Object> props = new HashMap<>();
+ if(bootstrapServers == null){
+ logger.error("Environment Variable " + bootstrapServers + " is missing");
+ throw new Exception("Environment Variable " + bootstrapServers + " is missing");
+ }
+ else{
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ }
+ if(saslJaasConfig == null){
+ logger.info("Not using any authentication for kafka interaction");
+ }
+ else{
+ logger.info("Using authentication provided by kafka interaction");
+ // Strimzi Kafka security properties
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("security.protocol", securityProtocol);
+ props.put("sasl.mechanism", saslMechanism);
+ props.put("sasl.jaas.config", saslJaasConfig);
+ props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries));
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
+ }
+
+ return new DefaultKafkaProducerFactory<>(props);
+ }
+
+ @Bean
+ public KafkaTemplate<String, String> kafkaTemplate() throws Exception {
+ return new KafkaTemplate<>(producerFactory());
+ }
+ }
+ \ No newline at end of file