diff options
Diffstat (limited to 'aai-core/src')
15 files changed, 572 insertions, 411 deletions
diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java deleted file mode 100644 index d3addebb..00000000 --- a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Modifications Copyright © 2018 IBM. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.dmaap; - -import java.util.Map; -import java.util.Objects; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.TextMessage; - -import org.json.JSONException; -import org.json.JSONObject; -import org.onap.aai.aailog.logs.AaiDmaapMetricLog; -import org.onap.aai.exceptions.AAIException; -import org.onap.aai.logging.AaiElsErrorCode; -import org.onap.aai.logging.ErrorLogHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import org.springframework.core.env.Environment; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.web.client.RestTemplate; - -public class AAIDmaapEventJMSConsumer implements MessageListener { - - private static final String EVENT_TOPIC = "event-topic"; - - private static final Logger LOGGER = LoggerFactory.getLogger(AAIDmaapEventJMSConsumer.class); - - private RestTemplate restTemplate; - - private HttpHeaders httpHeaders; - - private Environment environment; - private Map<String, String> mdcCopy; - - public AAIDmaapEventJMSConsumer(Environment environment, RestTemplate restTemplate, HttpHeaders httpHeaders) { - super(); - mdcCopy = MDC.getCopyOfContextMap(); - Objects.nonNull(environment); - Objects.nonNull(restTemplate); - Objects.nonNull(httpHeaders); - this.environment = environment; - this.restTemplate = restTemplate; - this.httpHeaders = httpHeaders; - } - - @Override - public void onMessage(Message message) { - - if (restTemplate == null) { - return; - } - - String jsmMessageTxt = ""; - String aaiEvent = ""; - JSONObject aaiEventHeader; - JSONObject joPayload; - String transactionId = ""; - String serviceName = ""; - String eventName = ""; - String aaiElsErrorCode = AaiElsErrorCode.SUCCESS; - String errorDescription = ""; - - if (mdcCopy != null) { - MDC.setContextMap(mdcCopy); - } - - if (message instanceof TextMessage) { - AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog(); - try { - jsmMessageTxt = ((TextMessage) message).getText(); - JSONObject jo = new JSONObject(jsmMessageTxt); - if (jo.has("aaiEventPayload")) { - joPayload = jo.getJSONObject("aaiEventPayload"); - aaiEvent = joPayload.toString(); - } else { - return; - } - if (jo.getString(EVENT_TOPIC) != null) { - eventName = jo.getString(EVENT_TOPIC); - } - if (joPayload.has("event-header")) { - try { - aaiEventHeader = joPayload.getJSONObject("event-header"); - if (aaiEventHeader.has("id")) { - transactionId = aaiEventHeader.get("id").toString(); - } - if (aaiEventHeader.has("entity-link")) { - serviceName = aaiEventHeader.get("entity-link").toString(); - } - } catch (JSONException jexc) { - // ignore, this is just used for logging - } - } - metricLog.pre(eventName, aaiEvent, transactionId, serviceName); - - HttpEntity<String> httpEntity = new HttpEntity<String>(aaiEvent, httpHeaders); - - String transportType = environment.getProperty("dmaap.ribbon.transportType", "http"); - String baseUrl = transportType + "://" + environment.getProperty("dmaap.ribbon.listOfServers"); - String endpoint = "/events/" + eventName; - - if ("AAI-EVENT".equals(eventName)) { - restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class); - } else { - LOGGER.error(String.format("%s|Event Topic invalid.", eventName)); - } - } catch (JMSException | JSONException e) { - aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR; - errorDescription = e.getMessage(); - ErrorLogHelper.logException(new AAIException("AAI_7350")); - } catch (Exception e) { - aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR; - errorDescription = e.getMessage(); - ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt)); - } finally { - metricLog.post(aaiElsErrorCode, errorDescription); - } - } - } -} diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java new file mode 100644 index 00000000..731f3dfc --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java @@ -0,0 +1,135 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Modifications Copyright © 2018 IBM. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + package org.onap.aai.kafka; + + import java.util.Map; + import java.util.Objects; + + import javax.jms.JMSException; + import javax.jms.Message; + import javax.jms.MessageListener; + import javax.jms.TextMessage; + + import org.json.JSONException; + import org.json.JSONObject; + import org.onap.aai.aailog.logs.AaiDmaapMetricLog; + import org.onap.aai.exceptions.AAIException; + import org.onap.aai.logging.AaiElsErrorCode; + import org.onap.aai.logging.ErrorLogHelper; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.slf4j.MDC; + import org.springframework.core.env.Environment; + import org.springframework.kafka.core.KafkaTemplate; + + public class AAIKafkaEventJMSConsumer implements MessageListener { + + private static final String EVENT_TOPIC = "event-topic"; + + private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class); + + private Environment environment; + private Map<String, String> mdcCopy; + private KafkaTemplate<String,String> kafkaTemplate; + + public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate<String,String> kafkaTemplate) { + super(); + mdcCopy = MDC.getCopyOfContextMap(); + Objects.nonNull(environment); + this.environment = environment; + this.kafkaTemplate=kafkaTemplate; + } + + @Override + public void onMessage(Message message) { + + if (kafkaTemplate == null) { + return; + } + + String jsmMessageTxt = ""; + String aaiEvent = ""; + JSONObject aaiEventHeader; + JSONObject joPayload; + String transactionId = ""; + String serviceName = ""; + String eventName = ""; + String aaiElsErrorCode = AaiElsErrorCode.SUCCESS; + String errorDescription = ""; + + if (mdcCopy != null) { + MDC.setContextMap(mdcCopy); + } + + if (message instanceof TextMessage) { + AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog(); + try { + jsmMessageTxt = ((TextMessage) message).getText(); + JSONObject jo = new JSONObject(jsmMessageTxt); + if (jo.has("aaiEventPayload")) { + joPayload = jo.getJSONObject("aaiEventPayload"); + aaiEvent = joPayload.toString(); + } else { + return; + } + if (jo.getString(EVENT_TOPIC) != null) { + eventName = jo.getString(EVENT_TOPIC); + } + if (joPayload.has("event-header")) { + try { + aaiEventHeader = joPayload.getJSONObject("event-header"); + if (aaiEventHeader.has("id")) { + transactionId = aaiEventHeader.get("id").toString(); + } + if (aaiEventHeader.has("entity-link")) { + serviceName = aaiEventHeader.get("entity-link").toString(); + } + } catch (JSONException jexc) { + // ignore, this is just used for logging + } + } + metricLog.pre(eventName, aaiEvent, transactionId, serviceName); + + + if ("AAI-EVENT".equals(eventName)) { + // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class); + kafkaTemplate.send(eventName,aaiEvent); + + } else { + LOGGER.error(String.format("%s|Event Topic invalid.", eventName)); + } + } catch (JMSException | JSONException e) { + aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR; + errorDescription = e.getMessage(); + ErrorLogHelper.logException(new AAIException("AAI_7350")); + } catch (Exception e) { + aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR; + errorDescription = e.getMessage(); + ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt)); + } finally { + metricLog.post(aaiElsErrorCode, errorDescription); + } + } + } + } +
\ No newline at end of file diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java index 3d675efe..00cf677f 100644 --- a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java @@ -20,7 +20,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.aai.dmaap; +package org.onap.aai.kafka; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; @@ -29,11 +29,11 @@ import org.onap.aai.util.AAIConfig; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; -public class AAIDmaapEventJMSProducer implements MessageProducer { +public class AAIKafkaEventJMSProducer implements MessageProducer { private JmsTemplate jmsTemplate; - public AAIDmaapEventJMSProducer() { + public AAIKafkaEventJMSProducer() { if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) { this.jmsTemplate = new JmsTemplate(); String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547"); diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java index da70d505..d6a491ef 100644 --- a/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java +++ b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.aai.dmaap; +package org.onap.aai.kafka; import org.json.JSONObject; diff --git a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java index 598ef231..6f3e8883 100644 --- a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java +++ b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java @@ -30,13 +30,13 @@ import org.eclipse.persistence.dynamic.DynamicEntity; import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; import org.json.JSONException; import org.json.JSONObject; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; -import org.onap.aai.dmaap.MessageProducer; import org.onap.aai.domain.notificationEvent.NotificationEvent; import org.onap.aai.exceptions.AAIException; import org.onap.aai.introspection.Introspector; import org.onap.aai.introspection.Loader; import org.onap.aai.introspection.exceptions.AAIUnknownObjectException; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.onap.aai.kafka.MessageProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; @@ -59,12 +59,12 @@ public class StoreNotificationEvent { * Instantiates a new store notification event. */ public StoreNotificationEvent(String transactionId, String sourceOfTruth) { - this.messageProducer = new AAIDmaapEventJMSProducer(); + this.messageProducer = new AAIKafkaEventJMSProducer(); this.transactionId = transactionId; this.sourceOfTruth = sourceOfTruth; } - public StoreNotificationEvent(AAIDmaapEventJMSProducer producer, String transactionId, String sourceOfTruth) { + public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) { this.messageProducer = producer; this.transactionId = transactionId; this.sourceOfTruth = sourceOfTruth; @@ -139,7 +139,7 @@ public class StoreNotificationEvent { try { PojoUtils pu = new PojoUtils(); String entityJson = pu.getJsonFromObject(ne); - sendToDmaapJmsQueue(entityJson); + sendToKafkaJmsQueue(entityJson); return entityJson; } catch (Exception e) { throw new AAIException("AAI_7350", e); @@ -227,7 +227,7 @@ public class StoreNotificationEvent { marshaller.setProperty(org.eclipse.persistence.jaxb.MarshallerProperties.JSON_WRAPPER_AS_ARRAY_NAME, false); marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, false); marshaller.marshal(notificationEvent, result); - this.sendToDmaapJmsQueue(result.toString()); + this.sendToKafkaJmsQueue(result.toString()); } catch (Exception e) { throw new AAIException("AAI_7350", e); @@ -380,7 +380,7 @@ public class StoreNotificationEvent { notificationEvent.setValue("entity", obj.getUnderlyingObject()); String entityJson = notificationEvent.marshal(false); - sendToDmaapJmsQueue(entityJson); + sendToKafkaJmsQueue(entityJson); return entityJson; } catch (JSONException e) { throw new AAIException("AAI_7350", e); @@ -389,7 +389,7 @@ public class StoreNotificationEvent { } } - private void sendToDmaapJmsQueue(String entityString) throws JSONException { + private void sendToKafkaJmsQueue(String entityString) throws JSONException { JSONObject entityJsonObject = new JSONObject(entityString); diff --git a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java index 10d47cd6..6fbc297b 100644 --- a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java +++ b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java @@ -28,8 +28,8 @@ import java.util.Date; import java.util.Map; import org.onap.aai.db.props.AAIProperties; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; -import org.onap.aai.dmaap.MessageProducer; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.onap.aai.kafka.MessageProducer; import org.onap.aai.util.AAIConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +50,7 @@ public class DeltaEvents { private MessageProducer messageProducer; public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas) { - this(transId, sourceName, schemaVersion, objectDeltas, new AAIDmaapEventJMSProducer()); + this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer()); } public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas, diff --git a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java b/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java deleted file mode 100644 index 078ca963..00000000 --- a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.web; - -import javax.annotation.PostConstruct; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.onap.aai.dmaap.AAIDmaapEventJMSConsumer; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; -import org.springframework.http.HttpHeaders; -import org.springframework.jms.connection.CachingConnectionFactory; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.listener.DefaultMessageListenerContainer; -import org.springframework.web.client.RestTemplate; - -@Profile("dmaap") -@Configuration -public class DmaapConfig { - - @Autowired - private ApplicationContext ctx; - - @Autowired - @Qualifier("dmaapRestTemplate") - private RestTemplate dmaapRestTemplate; - - @Autowired - @Qualifier("dmaapHeaders") - private HttpHeaders dmaapHeaders; - - @Value("${jms.bind.address}") - private String bindAddress; - - @PostConstruct - public void init() { - System.setProperty("activemq.tcp.url", bindAddress); - } - - @Bean(destroyMethod = "stop") - public BrokerService brokerService() throws Exception { - - BrokerService broker = new BrokerService(); - broker.addConnector(bindAddress); - broker.setPersistent(false); - broker.setUseJmx(false); - broker.setSchedulerSupport(false); - broker.start(); - - return broker; - } - - @Bean(name = "connectionFactory") - public ActiveMQConnectionFactory activeMQConnectionFactory() { - return new ActiveMQConnectionFactory(bindAddress); - } - - @Bean - public CachingConnectionFactory cachingConnectionFactory() { - return new CachingConnectionFactory(activeMQConnectionFactory()); - } - - @Bean(name = "destinationQueue") - public ActiveMQQueue activeMQQueue() { - return new ActiveMQQueue("IN_QUEUE"); - } - - @Bean - public JmsTemplate jmsTemplate() { - JmsTemplate jmsTemplate = new JmsTemplate(); - - jmsTemplate.setConnectionFactory(activeMQConnectionFactory()); - jmsTemplate.setDefaultDestination(activeMQQueue()); - - return jmsTemplate; - } - - @Bean - public AAIDmaapEventJMSProducer jmsProducer() { - return new AAIDmaapEventJMSProducer(); - } - - @Bean(name = "jmsConsumer") - public AAIDmaapEventJMSConsumer jmsConsumer() throws Exception { - return new AAIDmaapEventJMSConsumer(ctx.getEnvironment(), dmaapRestTemplate, dmaapHeaders); - } - - @Bean - public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception { - - DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); - - messageListenerContainer.setConnectionFactory(cachingConnectionFactory()); - messageListenerContainer.setDestinationName("IN_QUEUE"); - messageListenerContainer.setMessageListener(jmsConsumer()); - - return messageListenerContainer; - } -} diff --git a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java b/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java deleted file mode 100644 index cbd9b105..00000000 --- a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * ============LICENSE_START======================================================= - * org.onap.aai - * ================================================================================ - * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.aai.web; - -import java.io.UnsupportedEncodingException; -import java.util.Base64; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.web.client.RestTemplate; - -@Configuration -public class EventClientPublisher { - - private static final Logger LOGGER = LoggerFactory.getLogger(EventClientPublisher.class); - - @Value("${dmaap.ribbon.listOfServers:}") - private String hosts; - - @Value("${dmaap.ribbon.username:}") - private String username; - - @Value("${dmaap.ribbon.password:}") - private String password; - - @Value("${dmaap.ribbon.topic:AAI-EVENT}") - private String topic; - - @Value("${dmaap.ribbon.batchSize:100}") - private int maxBatchSize; - - @Value("${dmaap.ribbon.maxAgeMs:250}") - private int maxAgeMs; - - @Value("${dmaap.ribbon.delayBetweenBatches:100}") - private int delayBetweenBatches; - - @Value("${dmaap.ribbon.protocol:http}") - private String protocol; - - @Value("${dmaap.ribbon.transportType:HTTPNOAUTH}") - private String tranportType; - - @Value("${dmaap.ribbon.contentType:application/json}") - private String contentType; - - @Bean(name = "dmaapRestTemplate") - public RestTemplate dmaapRestTemplate() { - return new RestTemplate(); - } - - @Bean(name = "dmaapHeaders") - public HttpHeaders dmaapHeaders() throws UnsupportedEncodingException { - - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.setContentType(MediaType.APPLICATION_JSON); - - if (username != null && password != null) { - - if (!StringUtils.EMPTY.equals(username) && !StringUtils.EMPTY.equals(password)) { - - byte[] userPass = (username + ":" + password).getBytes("UTF-8"); - - httpHeaders.set("Authorization", "Basic " + Base64.getEncoder().encodeToString(userPass)); - } - } - - return httpHeaders; - } - -} diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java new file mode 100644 index 00000000..71ae5b6b --- /dev/null +++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java @@ -0,0 +1,175 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + package org.onap.aai.web; + + import java.util.HashMap; + import java.util.Map; + + import javax.annotation.PostConstruct; + + import org.apache.activemq.ActiveMQConnectionFactory; + import org.apache.activemq.broker.BrokerService; + import org.apache.activemq.command.ActiveMQQueue; + import org.apache.kafka.clients.producer.ProducerConfig; +import org.onap.aai.kafka.AAIKafkaEventJMSConsumer; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; +import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.springframework.beans.factory.annotation.Autowired; + import org.springframework.beans.factory.annotation.Value; + import org.springframework.context.ApplicationContext; + import org.springframework.context.annotation.Bean; + import org.springframework.context.annotation.Configuration; + import org.springframework.context.annotation.Profile; + import org.springframework.jms.connection.CachingConnectionFactory; + import org.springframework.jms.core.JmsTemplate; + import org.springframework.jms.listener.DefaultMessageListenerContainer; + import org.springframework.kafka.core.DefaultKafkaProducerFactory; + import org.springframework.kafka.core.KafkaTemplate; + import org.springframework.kafka.core.ProducerFactory; + + @Profile("kafka") + @Configuration + public class KafkaConfig { + + @Autowired + private ApplicationContext ctx; + + + @Value("${jms.bind.address}") + private String bindAddress; + + @Value("${spring.kafka.producer.bootstrap-servers}") + private String bootstrapServers; + + @Value("${spring.kafka.producer.properties.security.protocol}") + private String securityProtocol; + + @Value("${spring.kafka.producer.properties.sasl.mechanism}") + private String saslMechanism; + + @Value("${spring.kafka.producer.properties.sasl.jaas.config}") + private String saslJaasConfig; + + @Value("${spring.kafka.producer.retries}") + private Integer retries; + + private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class); + + @PostConstruct + public void init() { + System.setProperty("activemq.tcp.url", bindAddress); + } + + @Bean(destroyMethod = "stop") + public BrokerService brokerService() throws Exception { + + BrokerService broker = new BrokerService(); + broker.addConnector(bindAddress); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setSchedulerSupport(false); + broker.start(); + + return broker; + } + + @Bean(name = "connectionFactory") + public ActiveMQConnectionFactory activeMQConnectionFactory() { + return new ActiveMQConnectionFactory(bindAddress); + } + + @Bean + public CachingConnectionFactory cachingConnectionFactory() { + return new CachingConnectionFactory(activeMQConnectionFactory()); + } + + @Bean(name = "destinationQueue") + public ActiveMQQueue activeMQQueue() { + return new ActiveMQQueue("IN_QUEUE"); + } + + @Bean + public JmsTemplate jmsTemplate() { + JmsTemplate jmsTemplate = new JmsTemplate(); + + jmsTemplate.setConnectionFactory(activeMQConnectionFactory()); + jmsTemplate.setDefaultDestination(activeMQQueue()); + + return jmsTemplate; + } + + @Bean + public AAIKafkaEventJMSProducer jmsProducer() { + return new AAIKafkaEventJMSProducer(); + } + + @Bean(name = "jmsConsumer") + public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception { + return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate()); + } + + @Bean + public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception { + + DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(); + + messageListenerContainer.setConnectionFactory(cachingConnectionFactory()); + messageListenerContainer.setDestinationName("IN_QUEUE"); + messageListenerContainer.setMessageListener(jmsConsumer()); + + return messageListenerContainer; + } + + @Bean + public ProducerFactory<String, String> producerFactory() throws Exception { + Map<String, Object> props = new HashMap<>(); + if(bootstrapServers == null){ + logger.error("Environment Variable " + bootstrapServers + " is missing"); + throw new Exception("Environment Variable " + bootstrapServers + " is missing"); + } + else{ + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + if(saslJaasConfig == null){ + logger.info("Not using any authentication for kafka interaction"); + } + else{ + logger.info("Using authentication provided by kafka interaction"); + // Strimzi Kafka security properties + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("security.protocol", securityProtocol); + props.put("sasl.mechanism", saslMechanism); + props.put("sasl.jaas.config", saslJaasConfig); + props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries)); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5"); + } + + return new DefaultKafkaProducerFactory<>(props); + } + + @Bean + public KafkaTemplate<String, String> kafkaTemplate() throws Exception { + return new KafkaTemplate<>(producerFactory()); + } + } +
\ No newline at end of file diff --git a/aai-core/src/main/resources/logback.xml b/aai-core/src/main/resources/logback.xml index f2b1399f..fc66b32e 100644 --- a/aai-core/src/main/resources/logback.xml +++ b/aai-core/src/main/resources/logback.xml @@ -173,14 +173,14 @@ <appender-ref ref="translog" /> </appender> - <appender name="dmaapAAIEventConsumer" + <appender name="kafkaAAIEventConsumer" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>WARN</level> </filter> - <File>${logDirectory}/dmaapAAIEventConsumer/error.log</File> + <File>${logDirectory}/kafkaAAIEventConsumer/error.log</File> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/error.log.%d{yyyy-MM-dd} + <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/error.log.%d{yyyy-MM-dd} </fileNamePattern> </rollingPolicy> <encoder class="org.onap.aai.logging.EcompEncoder"> @@ -188,32 +188,32 @@ </encoder> </appender> - <appender name="dmaapAAIEventConsumerDebug" + <appender name="kafkaAAIEventConsumerDebug" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>DEBUG</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> - <File>${logDirectory}/dmaapAAIEventConsumer/debug.log</File> + <File>${logDirectory}/kafkaAAIEventConsumer/debug.log</File> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/debug.log.%d{yyyy-MM-dd} + <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/debug.log.%d{yyyy-MM-dd} </fileNamePattern> </rollingPolicy> <encoder class="org.onap.aai.logging.EcompEncoder"> <pattern>${eelfLogPattern}</pattern> </encoder> </appender> - <appender name="dmaapAAIEventConsumerMetric" + <appender name="kafkaAAIEventConsumerMetric" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>INFO</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> - <File>${logDirectory}/dmaapAAIEventConsumer/metrics.log</File> + <File>${logDirectory}/kafkaAAIEventConsumer/metrics.log</File> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/metrics.log.%d{yyyy-MM-dd} + <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/metrics.log.%d{yyyy-MM-dd} </fileNamePattern> </rollingPolicy> <encoder class="org.onap.aai.logging.EcompEncoder"> @@ -365,10 +365,10 @@ <appender-ref ref="asyncAUDIT"/> </logger> - <logger name="org.onap.aai.dmaap" level="DEBUG" additivity="false"> - <appender-ref ref="dmaapAAIEventConsumer" /> - <appender-ref ref="dmaapAAIEventConsumerDebug" /> - <appender-ref ref="dmaapAAIEventConsumerMetric" /> + <logger name="org.onap.aai.kafka" level="DEBUG" additivity="false"> + <appender-ref ref="kafkaAAIEventConsumer" /> + <appender-ref ref="kafkaAAIEventConsumerDebug" /> + <appender-ref ref="kafkaAAIEventConsumerMetric" /> </logger> <logger name="org.apache" level="OFF" /> diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java new file mode 100644 index 00000000..c72499c4 --- /dev/null +++ b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java @@ -0,0 +1,89 @@ +package org.onap.aai.kafka; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import javax.jms.TextMessage; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.onap.aai.PayloadUtil; +import org.springframework.core.env.Environment; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.util.ReflectionTestUtils; + +@RunWith(MockitoJUnitRunner.class) +@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) +public class AAIKafkaEventJMSConsumerTest { + + @Mock + private Environment environment; + + @Mock + private KafkaTemplate<String,String> kafkaTemplate; + + private AAIKafkaEventJMSConsumer aaiKafkaEventJMSConsumer; + + @Before + public void setUp(){ + aaiKafkaEventJMSConsumer = new AAIKafkaEventJMSConsumer(environment,kafkaTemplate); + } + + @Test + public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived() + throws Exception + { + TextMessage mockTextMessage = mock(TextMessage.class); + String payload = PayloadUtil.getResourcePayload("aai-event.json"); + + when(mockTextMessage.getText()).thenReturn(payload); + aaiKafkaEventJMSConsumer.onMessage(mockTextMessage); + verify(kafkaTemplate, times(1)).send(eq("AAI-EVENT"), anyString()); + } + + @Test + public void onMessage_shouldNotSendMessageToKafkaTopic_whenInvalidEventReceived() throws Exception{ + TextMessage mockTextMessage = mock(TextMessage.class); + String payload = PayloadUtil.getResourcePayload("aai-invalid-event.json"); + when(mockTextMessage.getText()).thenReturn(payload); + aaiKafkaEventJMSConsumer.onMessage(mockTextMessage); + } + + + @Test + public void onMessage_shouldHandleJSONException() throws Exception { + // Arrange + AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate); + TextMessage mockTextMessage = mock(TextMessage.class); + ReflectionTestUtils.setField(consumer, "kafkaTemplate", null); // Simulate null kafkaTemplate + + // Act + consumer.onMessage(mockTextMessage); + + // Assert + // Verify that exception is logged + } + + @Test + public void onMessage_shouldHandleGenericException() throws Exception { + // Arrange + AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate); + TextMessage mockTextMessage = mock(TextMessage.class); + when(mockTextMessage.getText()).thenReturn("{\"event-topic\":\"AAI-EVENT\",\"aaiEventPayload\":{}}"); // Valid JSON but missing required fields + + // Act + consumer.onMessage(mockTextMessage); + + // Assert + // Verify that exception is logged + } + +} diff --git a/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java b/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java index b4b8810e..a0c3f639 100644 --- a/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java +++ b/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java @@ -39,27 +39,27 @@ import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.onap.aai.AAISetup; -import org.onap.aai.dmaap.AAIDmaapEventJMSProducer; import org.onap.aai.domain.notificationEvent.NotificationEvent.EventHeader; import org.onap.aai.exceptions.AAIException; import org.onap.aai.introspection.Introspector; import org.onap.aai.introspection.Loader; import org.onap.aai.introspection.ModelType; +import org.onap.aai.kafka.AAIKafkaEventJMSProducer; public class StoreNotificationEventTest extends AAISetup { - private static AAIDmaapEventJMSProducer producer; + private static AAIKafkaEventJMSProducer producer; private static StoreNotificationEvent sne; @BeforeClass public static void setUp() { - producer = Mockito.mock(AAIDmaapEventJMSProducer.class); + producer = Mockito.mock(AAIKafkaEventJMSProducer.class); // sne = new StoreNotificationEvent(producer, "transiationId", "sourceOfTruth"); } @Before public void setUpBefore() { - producer = Mockito.mock(AAIDmaapEventJMSProducer.class); + producer = Mockito.mock(AAIKafkaEventJMSProducer.class); sne = new StoreNotificationEvent(producer, "transiationId", "sourceOfTruth"); } diff --git a/aai-core/src/test/resources/logback.xml b/aai-core/src/test/resources/logback.xml index 5d4f7bf3..4c82c0bf 100644 --- a/aai-core/src/test/resources/logback.xml +++ b/aai-core/src/test/resources/logback.xml @@ -170,14 +170,14 @@ <appender-ref ref="translog" /> </appender> - <appender name="dmaapAAIEventConsumer" + <appender name="kafkaAAIEventConsumer" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>WARN</level> </filter> - <File>${logDirectory}/dmaapAAIEventConsumer/error.log</File> + <File>${logDirectory}/kafkaAAIEventConsumer/error.log</File> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/error.log.%d{yyyy-MM-dd} + <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/error.log.%d{yyyy-MM-dd} </fileNamePattern> </rollingPolicy> <encoder class="org.onap.aai.logging.EcompEncoder"> @@ -185,32 +185,32 @@ </encoder> </appender> - <appender name="dmaapAAIEventConsumerDebug" + <appender name="kafkaAAIEventConsumerDebug" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>DEBUG</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> - <File>${logDirectory}/dmaapAAIEventConsumer/debug.log</File> + <File>${logDirectory}/kafkaAAIEventConsumer/debug.log</File> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/debug.log.%d{yyyy-MM-dd} + <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/debug.log.%d{yyyy-MM-dd} </fileNamePattern> </rollingPolicy> <encoder class="org.onap.aai.logging.EcompEncoder"> <pattern>${eelfLogPattern}</pattern> </encoder> </appender> - <appender name="dmaapAAIEventConsumerMetric" + <appender name="kafkaAAIEventConsumerMetric" class="ch.qos.logback.core.rolling.RollingFileAppender"> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>INFO</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> - <File>${logDirectory}/dmaapAAIEventConsumer/metrics.log</File> + <File>${logDirectory}/kafkaAAIEventConsumer/metrics.log</File> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/metrics.log.%d{yyyy-MM-dd} + <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/metrics.log.%d{yyyy-MM-dd} </fileNamePattern> </rollingPolicy> <encoder class="org.onap.aai.logging.EcompEncoder"> @@ -362,10 +362,10 @@ <appender-ref ref="asyncAUDIT"/> </logger> - <logger name="org.onap.aai.dmaap" level="DEBUG" additivity="false"> - <appender-ref ref="dmaapAAIEventConsumer" /> - <appender-ref ref="dmaapAAIEventConsumerDebug" /> - <appender-ref ref="dmaapAAIEventConsumerMetric" /> + <logger name="org.onap.aai.kafka" level="DEBUG" additivity="false"> + <appender-ref ref="kafkaAAIEventConsumer" /> + <appender-ref ref="kafkaAAIEventConsumerDebug" /> + <appender-ref ref="kafkaAAIEventConsumerMetric" /> </logger> <logger name="org.apache" level="WARN" /> diff --git a/aai-core/src/test/resources/payloads/resource/aai-event.json b/aai-core/src/test/resources/payloads/resource/aai-event.json new file mode 100644 index 00000000..0fab96da --- /dev/null +++ b/aai-core/src/test/resources/payloads/resource/aai-event.json @@ -0,0 +1,64 @@ +{ + "event-topic": "AAI-EVENT", + "aaiEventPayload": { + "cambria.partition": "AAI", + "event-header": { + "severity": "NORMAL", + "entity-type": "object-group", + "top-entity-type": "object-group", + "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster", + "event-type": "AAI-EVENT", + "domain": "dev", + "action": "UPDATE", + "sequence-number": "0", + "id": "23f12123-c326-48a7-b57e-e48746c295ea", + "source-name": "postman-api", + "version": "v28", + "timestamp": "20231207-12:14:44:757" + }, + "entity": { + "relationship-list": { + "relationship": [ + { + "related-to": "cell", + "relationship-data": [ + { + "relationship-value": "445611193273958916", + "relationship-key": "cell.cell-id" + } + ], + "related-link": "/aai/v28/network/cells/cell/445611193273958916", + "relationship-label": "org.onap.relationships.inventory.MemberOf", + "related-to-property": [ + { + "property-key": "cell.cell-name", + "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913" + } + ] + }, + { + "related-to": "cell", + "relationship-data": [ + { + "relationship-value": "445611193272330241", + "relationship-key": "cell.cell-id" + } + ], + "related-link": "/aai/v28/network/cells/cell/445611193272330241", + "relationship-label": "org.onap.relationships.inventory.MemberOf", + "related-to-property": [ + { + "property-key": "cell.cell-name", + "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803" + } + ] + } + ] + }, + "group-name": "Urban", + "resource-version": "1701951284582", + "group-type": "cell", + "object-group-id": "ric_cluster" + } + } +}
\ No newline at end of file diff --git a/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json b/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json new file mode 100644 index 00000000..77b2fc1f --- /dev/null +++ b/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json @@ -0,0 +1,64 @@ +{ + "event-topic": "AAI-INVALID-EVENT", + "aaiEventPayload": { + "cambria.partition": "AAI", + "event-header": { + "severity": "NORMAL", + "entity-type": "object-group", + "top-entity-type": "object-group", + "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster", + "event-type": "AAI-EVENT", + "domain": "dev", + "action": "UPDATE", + "sequence-number": "0", + "id": "23f12123-c326-48a7-b57e-e48746c295ea", + "source-name": "postman-api", + "version": "v28", + "timestamp": "20231207-12:14:44:757" + }, + "entity": { + "relationship-list": { + "relationship": [ + { + "related-to": "cell", + "relationship-data": [ + { + "relationship-value": "445611193273958916", + "relationship-key": "cell.cell-id" + } + ], + "related-link": "/aai/v28/network/cells/cell/445611193273958916", + "relationship-label": "org.onap.relationships.inventory.MemberOf", + "related-to-property": [ + { + "property-key": "cell.cell-name", + "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913" + } + ] + }, + { + "related-to": "cell", + "relationship-data": [ + { + "relationship-value": "445611193272330241", + "relationship-key": "cell.cell-id" + } + ], + "related-link": "/aai/v28/network/cells/cell/445611193272330241", + "relationship-label": "org.onap.relationships.inventory.MemberOf", + "related-to-property": [ + { + "property-key": "cell.cell-name", + "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803" + } + ] + } + ] + }, + "group-name": "Urban", + "resource-version": "1701951284582", + "group-type": "cell", + "object-group-id": "ric_cluster" + } + } +}
\ No newline at end of file |