summaryrefslogtreecommitdiffstats
path: root/aai-core
diff options
context:
space:
mode:
Diffstat (limited to 'aai-core')
-rw-r--r--aai-core/pom.xml21
-rw-r--r--aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java146
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java135
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java (renamed from aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java)6
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java (renamed from aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java)2
-rw-r--r--aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java16
-rw-r--r--aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java6
-rw-r--r--aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java125
-rw-r--r--aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java95
-rw-r--r--aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java175
-rw-r--r--aai-core/src/main/resources/logback.xml26
-rw-r--r--aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java89
-rw-r--r--aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java8
-rw-r--r--aai-core/src/test/resources/logback.xml26
-rw-r--r--aai-core/src/test/resources/payloads/resource/aai-event.json64
-rw-r--r--aai-core/src/test/resources/payloads/resource/aai-invalid-event.json64
16 files changed, 593 insertions, 411 deletions
diff --git a/aai-core/pom.xml b/aai-core/pom.xml
index fb646692..e48e8f58 100644
--- a/aai-core/pom.xml
+++ b/aai-core/pom.xml
@@ -136,6 +136,17 @@ limitations under the License.
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>com.att.eelf</groupId>
<artifactId>eelf-core</artifactId>
<version>2.0.0-oss</version>
@@ -191,6 +202,16 @@ limitations under the License.
<artifactId>jackson-jaxrs-json-provider</artifactId>
</dependency>
<dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ <version>2.7.14</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</dependency>
diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java
deleted file mode 100644
index d3addebb..00000000
--- a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSConsumer.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Modifications Copyright © 2018 IBM.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.aai.dmaap;
-
-import java.util.Map;
-import java.util.Objects;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
-import org.onap.aai.exceptions.AAIException;
-import org.onap.aai.logging.AaiElsErrorCode;
-import org.onap.aai.logging.ErrorLogHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.springframework.core.env.Environment;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.web.client.RestTemplate;
-
-public class AAIDmaapEventJMSConsumer implements MessageListener {
-
- private static final String EVENT_TOPIC = "event-topic";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(AAIDmaapEventJMSConsumer.class);
-
- private RestTemplate restTemplate;
-
- private HttpHeaders httpHeaders;
-
- private Environment environment;
- private Map<String, String> mdcCopy;
-
- public AAIDmaapEventJMSConsumer(Environment environment, RestTemplate restTemplate, HttpHeaders httpHeaders) {
- super();
- mdcCopy = MDC.getCopyOfContextMap();
- Objects.nonNull(environment);
- Objects.nonNull(restTemplate);
- Objects.nonNull(httpHeaders);
- this.environment = environment;
- this.restTemplate = restTemplate;
- this.httpHeaders = httpHeaders;
- }
-
- @Override
- public void onMessage(Message message) {
-
- if (restTemplate == null) {
- return;
- }
-
- String jsmMessageTxt = "";
- String aaiEvent = "";
- JSONObject aaiEventHeader;
- JSONObject joPayload;
- String transactionId = "";
- String serviceName = "";
- String eventName = "";
- String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
- String errorDescription = "";
-
- if (mdcCopy != null) {
- MDC.setContextMap(mdcCopy);
- }
-
- if (message instanceof TextMessage) {
- AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
- try {
- jsmMessageTxt = ((TextMessage) message).getText();
- JSONObject jo = new JSONObject(jsmMessageTxt);
- if (jo.has("aaiEventPayload")) {
- joPayload = jo.getJSONObject("aaiEventPayload");
- aaiEvent = joPayload.toString();
- } else {
- return;
- }
- if (jo.getString(EVENT_TOPIC) != null) {
- eventName = jo.getString(EVENT_TOPIC);
- }
- if (joPayload.has("event-header")) {
- try {
- aaiEventHeader = joPayload.getJSONObject("event-header");
- if (aaiEventHeader.has("id")) {
- transactionId = aaiEventHeader.get("id").toString();
- }
- if (aaiEventHeader.has("entity-link")) {
- serviceName = aaiEventHeader.get("entity-link").toString();
- }
- } catch (JSONException jexc) {
- // ignore, this is just used for logging
- }
- }
- metricLog.pre(eventName, aaiEvent, transactionId, serviceName);
-
- HttpEntity<String> httpEntity = new HttpEntity<String>(aaiEvent, httpHeaders);
-
- String transportType = environment.getProperty("dmaap.ribbon.transportType", "http");
- String baseUrl = transportType + "://" + environment.getProperty("dmaap.ribbon.listOfServers");
- String endpoint = "/events/" + eventName;
-
- if ("AAI-EVENT".equals(eventName)) {
- restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class);
- } else {
- LOGGER.error(String.format("%s|Event Topic invalid.", eventName));
- }
- } catch (JMSException | JSONException e) {
- aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
- errorDescription = e.getMessage();
- ErrorLogHelper.logException(new AAIException("AAI_7350"));
- } catch (Exception e) {
- aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
- errorDescription = e.getMessage();
- ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt));
- } finally {
- metricLog.post(aaiElsErrorCode, errorDescription);
- }
- }
- }
-}
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
new file mode 100644
index 00000000..731f3dfc
--- /dev/null
+++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumer.java
@@ -0,0 +1,135 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Modifications Copyright © 2018 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+ package org.onap.aai.kafka;
+
+ import java.util.Map;
+ import java.util.Objects;
+
+ import javax.jms.JMSException;
+ import javax.jms.Message;
+ import javax.jms.MessageListener;
+ import javax.jms.TextMessage;
+
+ import org.json.JSONException;
+ import org.json.JSONObject;
+ import org.onap.aai.aailog.logs.AaiDmaapMetricLog;
+ import org.onap.aai.exceptions.AAIException;
+ import org.onap.aai.logging.AaiElsErrorCode;
+ import org.onap.aai.logging.ErrorLogHelper;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.slf4j.MDC;
+ import org.springframework.core.env.Environment;
+ import org.springframework.kafka.core.KafkaTemplate;
+
+ public class AAIKafkaEventJMSConsumer implements MessageListener {
+
+ private static final String EVENT_TOPIC = "event-topic";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AAIKafkaEventJMSConsumer.class);
+
+ private Environment environment;
+ private Map<String, String> mdcCopy;
+ private KafkaTemplate<String,String> kafkaTemplate;
+
+ public AAIKafkaEventJMSConsumer(Environment environment,KafkaTemplate<String,String> kafkaTemplate) {
+ super();
+ mdcCopy = MDC.getCopyOfContextMap();
+ Objects.nonNull(environment);
+ this.environment = environment;
+ this.kafkaTemplate=kafkaTemplate;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+
+ if (kafkaTemplate == null) {
+ return;
+ }
+
+ String jsmMessageTxt = "";
+ String aaiEvent = "";
+ JSONObject aaiEventHeader;
+ JSONObject joPayload;
+ String transactionId = "";
+ String serviceName = "";
+ String eventName = "";
+ String aaiElsErrorCode = AaiElsErrorCode.SUCCESS;
+ String errorDescription = "";
+
+ if (mdcCopy != null) {
+ MDC.setContextMap(mdcCopy);
+ }
+
+ if (message instanceof TextMessage) {
+ AaiDmaapMetricLog metricLog = new AaiDmaapMetricLog();
+ try {
+ jsmMessageTxt = ((TextMessage) message).getText();
+ JSONObject jo = new JSONObject(jsmMessageTxt);
+ if (jo.has("aaiEventPayload")) {
+ joPayload = jo.getJSONObject("aaiEventPayload");
+ aaiEvent = joPayload.toString();
+ } else {
+ return;
+ }
+ if (jo.getString(EVENT_TOPIC) != null) {
+ eventName = jo.getString(EVENT_TOPIC);
+ }
+ if (joPayload.has("event-header")) {
+ try {
+ aaiEventHeader = joPayload.getJSONObject("event-header");
+ if (aaiEventHeader.has("id")) {
+ transactionId = aaiEventHeader.get("id").toString();
+ }
+ if (aaiEventHeader.has("entity-link")) {
+ serviceName = aaiEventHeader.get("entity-link").toString();
+ }
+ } catch (JSONException jexc) {
+ // ignore, this is just used for logging
+ }
+ }
+ metricLog.pre(eventName, aaiEvent, transactionId, serviceName);
+
+
+ if ("AAI-EVENT".equals(eventName)) {
+ // restTemplate.exchange(baseUrl + endpoint, HttpMethod.POST, httpEntity, String.class);
+ kafkaTemplate.send(eventName,aaiEvent);
+
+ } else {
+ LOGGER.error(String.format("%s|Event Topic invalid.", eventName));
+ }
+ } catch (JMSException | JSONException e) {
+ aaiElsErrorCode = AaiElsErrorCode.DATA_ERROR;
+ errorDescription = e.getMessage();
+ ErrorLogHelper.logException(new AAIException("AAI_7350"));
+ } catch (Exception e) {
+ aaiElsErrorCode = AaiElsErrorCode.AVAILABILITY_TIMEOUT_ERROR;
+ errorDescription = e.getMessage();
+ ErrorLogHelper.logException(new AAIException("AAI_7304", jsmMessageTxt));
+ } finally {
+ metricLog.post(aaiElsErrorCode, errorDescription);
+ }
+ }
+ }
+ }
+ \ No newline at end of file
diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
index 3d675efe..00cf677f 100644
--- a/aai-core/src/main/java/org/onap/aai/dmaap/AAIDmaapEventJMSProducer.java
+++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
@@ -20,7 +20,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.aai.dmaap;
+package org.onap.aai.kafka;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
@@ -29,11 +29,11 @@ import org.onap.aai.util.AAIConfig;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
-public class AAIDmaapEventJMSProducer implements MessageProducer {
+public class AAIKafkaEventJMSProducer implements MessageProducer {
private JmsTemplate jmsTemplate;
- public AAIDmaapEventJMSProducer() {
+ public AAIKafkaEventJMSProducer() {
if ("true".equals(AAIConfig.get("aai.jms.enable", "true"))) {
this.jmsTemplate = new JmsTemplate();
String activeMqTcpUrl = System.getProperty("activemq.tcp.url", "tcp://localhost:61547");
diff --git a/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java
index da70d505..d6a491ef 100644
--- a/aai-core/src/main/java/org/onap/aai/dmaap/MessageProducer.java
+++ b/aai-core/src/main/java/org/onap/aai/kafka/MessageProducer.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.aai.dmaap;
+package org.onap.aai.kafka;
import org.json.JSONObject;
diff --git a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
index 598ef231..6f3e8883 100644
--- a/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
+++ b/aai-core/src/main/java/org/onap/aai/util/StoreNotificationEvent.java
@@ -30,13 +30,13 @@ import org.eclipse.persistence.dynamic.DynamicEntity;
import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
import org.json.JSONException;
import org.json.JSONObject;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
-import org.onap.aai.dmaap.MessageProducer;
import org.onap.aai.domain.notificationEvent.NotificationEvent;
import org.onap.aai.exceptions.AAIException;
import org.onap.aai.introspection.Introspector;
import org.onap.aai.introspection.Loader;
import org.onap.aai.introspection.exceptions.AAIUnknownObjectException;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.onap.aai.kafka.MessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
@@ -59,12 +59,12 @@ public class StoreNotificationEvent {
* Instantiates a new store notification event.
*/
public StoreNotificationEvent(String transactionId, String sourceOfTruth) {
- this.messageProducer = new AAIDmaapEventJMSProducer();
+ this.messageProducer = new AAIKafkaEventJMSProducer();
this.transactionId = transactionId;
this.sourceOfTruth = sourceOfTruth;
}
- public StoreNotificationEvent(AAIDmaapEventJMSProducer producer, String transactionId, String sourceOfTruth) {
+ public StoreNotificationEvent(AAIKafkaEventJMSProducer producer, String transactionId, String sourceOfTruth) {
this.messageProducer = producer;
this.transactionId = transactionId;
this.sourceOfTruth = sourceOfTruth;
@@ -139,7 +139,7 @@ public class StoreNotificationEvent {
try {
PojoUtils pu = new PojoUtils();
String entityJson = pu.getJsonFromObject(ne);
- sendToDmaapJmsQueue(entityJson);
+ sendToKafkaJmsQueue(entityJson);
return entityJson;
} catch (Exception e) {
throw new AAIException("AAI_7350", e);
@@ -227,7 +227,7 @@ public class StoreNotificationEvent {
marshaller.setProperty(org.eclipse.persistence.jaxb.MarshallerProperties.JSON_WRAPPER_AS_ARRAY_NAME, false);
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, false);
marshaller.marshal(notificationEvent, result);
- this.sendToDmaapJmsQueue(result.toString());
+ this.sendToKafkaJmsQueue(result.toString());
} catch (Exception e) {
throw new AAIException("AAI_7350", e);
@@ -380,7 +380,7 @@ public class StoreNotificationEvent {
notificationEvent.setValue("entity", obj.getUnderlyingObject());
String entityJson = notificationEvent.marshal(false);
- sendToDmaapJmsQueue(entityJson);
+ sendToKafkaJmsQueue(entityJson);
return entityJson;
} catch (JSONException e) {
throw new AAIException("AAI_7350", e);
@@ -389,7 +389,7 @@ public class StoreNotificationEvent {
}
}
- private void sendToDmaapJmsQueue(String entityString) throws JSONException {
+ private void sendToKafkaJmsQueue(String entityString) throws JSONException {
JSONObject entityJsonObject = new JSONObject(entityString);
diff --git a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
index 10d47cd6..6fbc297b 100644
--- a/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
+++ b/aai-core/src/main/java/org/onap/aai/util/delta/DeltaEvents.java
@@ -28,8 +28,8 @@ import java.util.Date;
import java.util.Map;
import org.onap.aai.db.props.AAIProperties;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
-import org.onap.aai.dmaap.MessageProducer;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.onap.aai.kafka.MessageProducer;
import org.onap.aai.util.AAIConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +50,7 @@ public class DeltaEvents {
private MessageProducer messageProducer;
public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas) {
- this(transId, sourceName, schemaVersion, objectDeltas, new AAIDmaapEventJMSProducer());
+ this(transId, sourceName, schemaVersion, objectDeltas, new AAIKafkaEventJMSProducer());
}
public DeltaEvents(String transId, String sourceName, String schemaVersion, Map<String, ObjectDelta> objectDeltas,
diff --git a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java b/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java
deleted file mode 100644
index 078ca963..00000000
--- a/aai-core/src/main/java/org/onap/aai/web/DmaapConfig.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.aai.web;
-
-import javax.annotation.PostConstruct;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.onap.aai.dmaap.AAIDmaapEventJMSConsumer;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Profile;
-import org.springframework.http.HttpHeaders;
-import org.springframework.jms.connection.CachingConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
-import org.springframework.web.client.RestTemplate;
-
-@Profile("dmaap")
-@Configuration
-public class DmaapConfig {
-
- @Autowired
- private ApplicationContext ctx;
-
- @Autowired
- @Qualifier("dmaapRestTemplate")
- private RestTemplate dmaapRestTemplate;
-
- @Autowired
- @Qualifier("dmaapHeaders")
- private HttpHeaders dmaapHeaders;
-
- @Value("${jms.bind.address}")
- private String bindAddress;
-
- @PostConstruct
- public void init() {
- System.setProperty("activemq.tcp.url", bindAddress);
- }
-
- @Bean(destroyMethod = "stop")
- public BrokerService brokerService() throws Exception {
-
- BrokerService broker = new BrokerService();
- broker.addConnector(bindAddress);
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.setSchedulerSupport(false);
- broker.start();
-
- return broker;
- }
-
- @Bean(name = "connectionFactory")
- public ActiveMQConnectionFactory activeMQConnectionFactory() {
- return new ActiveMQConnectionFactory(bindAddress);
- }
-
- @Bean
- public CachingConnectionFactory cachingConnectionFactory() {
- return new CachingConnectionFactory(activeMQConnectionFactory());
- }
-
- @Bean(name = "destinationQueue")
- public ActiveMQQueue activeMQQueue() {
- return new ActiveMQQueue("IN_QUEUE");
- }
-
- @Bean
- public JmsTemplate jmsTemplate() {
- JmsTemplate jmsTemplate = new JmsTemplate();
-
- jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
- jmsTemplate.setDefaultDestination(activeMQQueue());
-
- return jmsTemplate;
- }
-
- @Bean
- public AAIDmaapEventJMSProducer jmsProducer() {
- return new AAIDmaapEventJMSProducer();
- }
-
- @Bean(name = "jmsConsumer")
- public AAIDmaapEventJMSConsumer jmsConsumer() throws Exception {
- return new AAIDmaapEventJMSConsumer(ctx.getEnvironment(), dmaapRestTemplate, dmaapHeaders);
- }
-
- @Bean
- public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
-
- DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
-
- messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
- messageListenerContainer.setDestinationName("IN_QUEUE");
- messageListenerContainer.setMessageListener(jmsConsumer());
-
- return messageListenerContainer;
- }
-}
diff --git a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java b/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java
deleted file mode 100644
index cbd9b105..00000000
--- a/aai-core/src/main/java/org/onap/aai/web/EventClientPublisher.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.aai.web;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Base64;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.web.client.RestTemplate;
-
-@Configuration
-public class EventClientPublisher {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(EventClientPublisher.class);
-
- @Value("${dmaap.ribbon.listOfServers:}")
- private String hosts;
-
- @Value("${dmaap.ribbon.username:}")
- private String username;
-
- @Value("${dmaap.ribbon.password:}")
- private String password;
-
- @Value("${dmaap.ribbon.topic:AAI-EVENT}")
- private String topic;
-
- @Value("${dmaap.ribbon.batchSize:100}")
- private int maxBatchSize;
-
- @Value("${dmaap.ribbon.maxAgeMs:250}")
- private int maxAgeMs;
-
- @Value("${dmaap.ribbon.delayBetweenBatches:100}")
- private int delayBetweenBatches;
-
- @Value("${dmaap.ribbon.protocol:http}")
- private String protocol;
-
- @Value("${dmaap.ribbon.transportType:HTTPNOAUTH}")
- private String tranportType;
-
- @Value("${dmaap.ribbon.contentType:application/json}")
- private String contentType;
-
- @Bean(name = "dmaapRestTemplate")
- public RestTemplate dmaapRestTemplate() {
- return new RestTemplate();
- }
-
- @Bean(name = "dmaapHeaders")
- public HttpHeaders dmaapHeaders() throws UnsupportedEncodingException {
-
- HttpHeaders httpHeaders = new HttpHeaders();
- httpHeaders.setContentType(MediaType.APPLICATION_JSON);
-
- if (username != null && password != null) {
-
- if (!StringUtils.EMPTY.equals(username) && !StringUtils.EMPTY.equals(password)) {
-
- byte[] userPass = (username + ":" + password).getBytes("UTF-8");
-
- httpHeaders.set("Authorization", "Basic " + Base64.getEncoder().encodeToString(userPass));
- }
- }
-
- return httpHeaders;
- }
-
-}
diff --git a/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
new file mode 100644
index 00000000..71ae5b6b
--- /dev/null
+++ b/aai-core/src/main/java/org/onap/aai/web/KafkaConfig.java
@@ -0,0 +1,175 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+ package org.onap.aai.web;
+
+ import java.util.HashMap;
+ import java.util.Map;
+
+ import javax.annotation.PostConstruct;
+
+ import org.apache.activemq.ActiveMQConnectionFactory;
+ import org.apache.activemq.broker.BrokerService;
+ import org.apache.activemq.command.ActiveMQQueue;
+ import org.apache.kafka.clients.producer.ProducerConfig;
+import org.onap.aai.kafka.AAIKafkaEventJMSConsumer;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
+import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.beans.factory.annotation.Autowired;
+ import org.springframework.beans.factory.annotation.Value;
+ import org.springframework.context.ApplicationContext;
+ import org.springframework.context.annotation.Bean;
+ import org.springframework.context.annotation.Configuration;
+ import org.springframework.context.annotation.Profile;
+ import org.springframework.jms.connection.CachingConnectionFactory;
+ import org.springframework.jms.core.JmsTemplate;
+ import org.springframework.jms.listener.DefaultMessageListenerContainer;
+ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+ import org.springframework.kafka.core.KafkaTemplate;
+ import org.springframework.kafka.core.ProducerFactory;
+
+ @Profile("kafka")
+ @Configuration
+ public class KafkaConfig {
+
+ @Autowired
+ private ApplicationContext ctx;
+
+
+ @Value("${jms.bind.address}")
+ private String bindAddress;
+
+ @Value("${spring.kafka.producer.bootstrap-servers}")
+ private String bootstrapServers;
+
+ @Value("${spring.kafka.producer.properties.security.protocol}")
+ private String securityProtocol;
+
+ @Value("${spring.kafka.producer.properties.sasl.mechanism}")
+ private String saslMechanism;
+
+ @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
+ private String saslJaasConfig;
+
+ @Value("${spring.kafka.producer.retries}")
+ private Integer retries;
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
+
+ @PostConstruct
+ public void init() {
+ System.setProperty("activemq.tcp.url", bindAddress);
+ }
+
+ @Bean(destroyMethod = "stop")
+ public BrokerService brokerService() throws Exception {
+
+ BrokerService broker = new BrokerService();
+ broker.addConnector(bindAddress);
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.setSchedulerSupport(false);
+ broker.start();
+
+ return broker;
+ }
+
+ @Bean(name = "connectionFactory")
+ public ActiveMQConnectionFactory activeMQConnectionFactory() {
+ return new ActiveMQConnectionFactory(bindAddress);
+ }
+
+ @Bean
+ public CachingConnectionFactory cachingConnectionFactory() {
+ return new CachingConnectionFactory(activeMQConnectionFactory());
+ }
+
+ @Bean(name = "destinationQueue")
+ public ActiveMQQueue activeMQQueue() {
+ return new ActiveMQQueue("IN_QUEUE");
+ }
+
+ @Bean
+ public JmsTemplate jmsTemplate() {
+ JmsTemplate jmsTemplate = new JmsTemplate();
+
+ jmsTemplate.setConnectionFactory(activeMQConnectionFactory());
+ jmsTemplate.setDefaultDestination(activeMQQueue());
+
+ return jmsTemplate;
+ }
+
+ @Bean
+ public AAIKafkaEventJMSProducer jmsProducer() {
+ return new AAIKafkaEventJMSProducer();
+ }
+
+ @Bean(name = "jmsConsumer")
+ public AAIKafkaEventJMSConsumer jmsConsumer() throws Exception {
+ return new AAIKafkaEventJMSConsumer(ctx.getEnvironment(),kafkaTemplate());
+ }
+
+ @Bean
+ public DefaultMessageListenerContainer defaultMessageListenerContainer() throws Exception {
+
+ DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
+
+ messageListenerContainer.setConnectionFactory(cachingConnectionFactory());
+ messageListenerContainer.setDestinationName("IN_QUEUE");
+ messageListenerContainer.setMessageListener(jmsConsumer());
+
+ return messageListenerContainer;
+ }
+
+ @Bean
+ public ProducerFactory<String, String> producerFactory() throws Exception {
+ Map<String, Object> props = new HashMap<>();
+ if(bootstrapServers == null){
+ logger.error("Environment Variable " + bootstrapServers + " is missing");
+ throw new Exception("Environment Variable " + bootstrapServers + " is missing");
+ }
+ else{
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ }
+ if(saslJaasConfig == null){
+ logger.info("Not using any authentication for kafka interaction");
+ }
+ else{
+ logger.info("Using authentication provided by kafka interaction");
+ // Strimzi Kafka security properties
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("security.protocol", securityProtocol);
+ props.put("sasl.mechanism", saslMechanism);
+ props.put("sasl.jaas.config", saslJaasConfig);
+ props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(retries));
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
+ }
+
+ return new DefaultKafkaProducerFactory<>(props);
+ }
+
+ @Bean
+ public KafkaTemplate<String, String> kafkaTemplate() throws Exception {
+ return new KafkaTemplate<>(producerFactory());
+ }
+ }
+ \ No newline at end of file
diff --git a/aai-core/src/main/resources/logback.xml b/aai-core/src/main/resources/logback.xml
index f2b1399f..fc66b32e 100644
--- a/aai-core/src/main/resources/logback.xml
+++ b/aai-core/src/main/resources/logback.xml
@@ -173,14 +173,14 @@
<appender-ref ref="translog" />
</appender>
- <appender name="dmaapAAIEventConsumer"
+ <appender name="kafkaAAIEventConsumer"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
- <File>${logDirectory}/dmaapAAIEventConsumer/error.log</File>
+ <File>${logDirectory}/kafkaAAIEventConsumer/error.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/error.log.%d{yyyy-MM-dd}
+ <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/error.log.%d{yyyy-MM-dd}
</fileNamePattern>
</rollingPolicy>
<encoder class="org.onap.aai.logging.EcompEncoder">
@@ -188,32 +188,32 @@
</encoder>
</appender>
- <appender name="dmaapAAIEventConsumerDebug"
+ <appender name="kafkaAAIEventConsumerDebug"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>DEBUG</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
- <File>${logDirectory}/dmaapAAIEventConsumer/debug.log</File>
+ <File>${logDirectory}/kafkaAAIEventConsumer/debug.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/debug.log.%d{yyyy-MM-dd}
+ <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/debug.log.%d{yyyy-MM-dd}
</fileNamePattern>
</rollingPolicy>
<encoder class="org.onap.aai.logging.EcompEncoder">
<pattern>${eelfLogPattern}</pattern>
</encoder>
</appender>
- <appender name="dmaapAAIEventConsumerMetric"
+ <appender name="kafkaAAIEventConsumerMetric"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
- <File>${logDirectory}/dmaapAAIEventConsumer/metrics.log</File>
+ <File>${logDirectory}/kafkaAAIEventConsumer/metrics.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/metrics.log.%d{yyyy-MM-dd}
+ <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/metrics.log.%d{yyyy-MM-dd}
</fileNamePattern>
</rollingPolicy>
<encoder class="org.onap.aai.logging.EcompEncoder">
@@ -365,10 +365,10 @@
<appender-ref ref="asyncAUDIT"/>
</logger>
- <logger name="org.onap.aai.dmaap" level="DEBUG" additivity="false">
- <appender-ref ref="dmaapAAIEventConsumer" />
- <appender-ref ref="dmaapAAIEventConsumerDebug" />
- <appender-ref ref="dmaapAAIEventConsumerMetric" />
+ <logger name="org.onap.aai.kafka" level="DEBUG" additivity="false">
+ <appender-ref ref="kafkaAAIEventConsumer" />
+ <appender-ref ref="kafkaAAIEventConsumerDebug" />
+ <appender-ref ref="kafkaAAIEventConsumerMetric" />
</logger>
<logger name="org.apache" level="OFF" />
diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java
new file mode 100644
index 00000000..c72499c4
--- /dev/null
+++ b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventJMSConsumerTest.java
@@ -0,0 +1,89 @@
+package org.onap.aai.kafka;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import javax.jms.TextMessage;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.aai.PayloadUtil;
+import org.springframework.core.env.Environment;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.util.ReflectionTestUtils;
+
+@RunWith(MockitoJUnitRunner.class)
+@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
+public class AAIKafkaEventJMSConsumerTest {
+
+ @Mock
+ private Environment environment;
+
+ @Mock
+ private KafkaTemplate<String,String> kafkaTemplate;
+
+ private AAIKafkaEventJMSConsumer aaiKafkaEventJMSConsumer;
+
+ @Before
+ public void setUp(){
+ aaiKafkaEventJMSConsumer = new AAIKafkaEventJMSConsumer(environment,kafkaTemplate);
+ }
+
+ @Test
+ public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived()
+ throws Exception
+ {
+ TextMessage mockTextMessage = mock(TextMessage.class);
+ String payload = PayloadUtil.getResourcePayload("aai-event.json");
+
+ when(mockTextMessage.getText()).thenReturn(payload);
+ aaiKafkaEventJMSConsumer.onMessage(mockTextMessage);
+ verify(kafkaTemplate, times(1)).send(eq("AAI-EVENT"), anyString());
+ }
+
+ @Test
+ public void onMessage_shouldNotSendMessageToKafkaTopic_whenInvalidEventReceived() throws Exception{
+ TextMessage mockTextMessage = mock(TextMessage.class);
+ String payload = PayloadUtil.getResourcePayload("aai-invalid-event.json");
+ when(mockTextMessage.getText()).thenReturn(payload);
+ aaiKafkaEventJMSConsumer.onMessage(mockTextMessage);
+ }
+
+
+ @Test
+ public void onMessage_shouldHandleJSONException() throws Exception {
+ // Arrange
+ AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate);
+ TextMessage mockTextMessage = mock(TextMessage.class);
+ ReflectionTestUtils.setField(consumer, "kafkaTemplate", null); // Simulate null kafkaTemplate
+
+ // Act
+ consumer.onMessage(mockTextMessage);
+
+ // Assert
+ // Verify that exception is logged
+ }
+
+ @Test
+ public void onMessage_shouldHandleGenericException() throws Exception {
+ // Arrange
+ AAIKafkaEventJMSConsumer consumer = new AAIKafkaEventJMSConsumer(null, kafkaTemplate);
+ TextMessage mockTextMessage = mock(TextMessage.class);
+ when(mockTextMessage.getText()).thenReturn("{\"event-topic\":\"AAI-EVENT\",\"aaiEventPayload\":{}}"); // Valid JSON but missing required fields
+
+ // Act
+ consumer.onMessage(mockTextMessage);
+
+ // Assert
+ // Verify that exception is logged
+ }
+
+}
diff --git a/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java b/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java
index b4b8810e..a0c3f639 100644
--- a/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java
+++ b/aai-core/src/test/java/org/onap/aai/util/StoreNotificationEventTest.java
@@ -39,27 +39,27 @@ import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.onap.aai.AAISetup;
-import org.onap.aai.dmaap.AAIDmaapEventJMSProducer;
import org.onap.aai.domain.notificationEvent.NotificationEvent.EventHeader;
import org.onap.aai.exceptions.AAIException;
import org.onap.aai.introspection.Introspector;
import org.onap.aai.introspection.Loader;
import org.onap.aai.introspection.ModelType;
+import org.onap.aai.kafka.AAIKafkaEventJMSProducer;
public class StoreNotificationEventTest extends AAISetup {
- private static AAIDmaapEventJMSProducer producer;
+ private static AAIKafkaEventJMSProducer producer;
private static StoreNotificationEvent sne;
@BeforeClass
public static void setUp() {
- producer = Mockito.mock(AAIDmaapEventJMSProducer.class);
+ producer = Mockito.mock(AAIKafkaEventJMSProducer.class);
// sne = new StoreNotificationEvent(producer, "transiationId", "sourceOfTruth");
}
@Before
public void setUpBefore() {
- producer = Mockito.mock(AAIDmaapEventJMSProducer.class);
+ producer = Mockito.mock(AAIKafkaEventJMSProducer.class);
sne = new StoreNotificationEvent(producer, "transiationId", "sourceOfTruth");
}
diff --git a/aai-core/src/test/resources/logback.xml b/aai-core/src/test/resources/logback.xml
index 5d4f7bf3..4c82c0bf 100644
--- a/aai-core/src/test/resources/logback.xml
+++ b/aai-core/src/test/resources/logback.xml
@@ -170,14 +170,14 @@
<appender-ref ref="translog" />
</appender>
- <appender name="dmaapAAIEventConsumer"
+ <appender name="kafkaAAIEventConsumer"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
- <File>${logDirectory}/dmaapAAIEventConsumer/error.log</File>
+ <File>${logDirectory}/kafkaAAIEventConsumer/error.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/error.log.%d{yyyy-MM-dd}
+ <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/error.log.%d{yyyy-MM-dd}
</fileNamePattern>
</rollingPolicy>
<encoder class="org.onap.aai.logging.EcompEncoder">
@@ -185,32 +185,32 @@
</encoder>
</appender>
- <appender name="dmaapAAIEventConsumerDebug"
+ <appender name="kafkaAAIEventConsumerDebug"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>DEBUG</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
- <File>${logDirectory}/dmaapAAIEventConsumer/debug.log</File>
+ <File>${logDirectory}/kafkaAAIEventConsumer/debug.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/debug.log.%d{yyyy-MM-dd}
+ <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/debug.log.%d{yyyy-MM-dd}
</fileNamePattern>
</rollingPolicy>
<encoder class="org.onap.aai.logging.EcompEncoder">
<pattern>${eelfLogPattern}</pattern>
</encoder>
</appender>
- <appender name="dmaapAAIEventConsumerMetric"
+ <appender name="kafkaAAIEventConsumerMetric"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
- <File>${logDirectory}/dmaapAAIEventConsumer/metrics.log</File>
+ <File>${logDirectory}/kafkaAAIEventConsumer/metrics.log</File>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${logDirectory}/dmaapAAIEventConsumer/metrics.log.%d{yyyy-MM-dd}
+ <fileNamePattern>${logDirectory}/kafkaAAIEventConsumer/metrics.log.%d{yyyy-MM-dd}
</fileNamePattern>
</rollingPolicy>
<encoder class="org.onap.aai.logging.EcompEncoder">
@@ -362,10 +362,10 @@
<appender-ref ref="asyncAUDIT"/>
</logger>
- <logger name="org.onap.aai.dmaap" level="DEBUG" additivity="false">
- <appender-ref ref="dmaapAAIEventConsumer" />
- <appender-ref ref="dmaapAAIEventConsumerDebug" />
- <appender-ref ref="dmaapAAIEventConsumerMetric" />
+ <logger name="org.onap.aai.kafka" level="DEBUG" additivity="false">
+ <appender-ref ref="kafkaAAIEventConsumer" />
+ <appender-ref ref="kafkaAAIEventConsumerDebug" />
+ <appender-ref ref="kafkaAAIEventConsumerMetric" />
</logger>
<logger name="org.apache" level="WARN" />
diff --git a/aai-core/src/test/resources/payloads/resource/aai-event.json b/aai-core/src/test/resources/payloads/resource/aai-event.json
new file mode 100644
index 00000000..0fab96da
--- /dev/null
+++ b/aai-core/src/test/resources/payloads/resource/aai-event.json
@@ -0,0 +1,64 @@
+{
+ "event-topic": "AAI-EVENT",
+ "aaiEventPayload": {
+ "cambria.partition": "AAI",
+ "event-header": {
+ "severity": "NORMAL",
+ "entity-type": "object-group",
+ "top-entity-type": "object-group",
+ "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster",
+ "event-type": "AAI-EVENT",
+ "domain": "dev",
+ "action": "UPDATE",
+ "sequence-number": "0",
+ "id": "23f12123-c326-48a7-b57e-e48746c295ea",
+ "source-name": "postman-api",
+ "version": "v28",
+ "timestamp": "20231207-12:14:44:757"
+ },
+ "entity": {
+ "relationship-list": {
+ "relationship": [
+ {
+ "related-to": "cell",
+ "relationship-data": [
+ {
+ "relationship-value": "445611193273958916",
+ "relationship-key": "cell.cell-id"
+ }
+ ],
+ "related-link": "/aai/v28/network/cells/cell/445611193273958916",
+ "relationship-label": "org.onap.relationships.inventory.MemberOf",
+ "related-to-property": [
+ {
+ "property-key": "cell.cell-name",
+ "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913"
+ }
+ ]
+ },
+ {
+ "related-to": "cell",
+ "relationship-data": [
+ {
+ "relationship-value": "445611193272330241",
+ "relationship-key": "cell.cell-id"
+ }
+ ],
+ "related-link": "/aai/v28/network/cells/cell/445611193272330241",
+ "relationship-label": "org.onap.relationships.inventory.MemberOf",
+ "related-to-property": [
+ {
+ "property-key": "cell.cell-name",
+ "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803"
+ }
+ ]
+ }
+ ]
+ },
+ "group-name": "Urban",
+ "resource-version": "1701951284582",
+ "group-type": "cell",
+ "object-group-id": "ric_cluster"
+ }
+ }
+} \ No newline at end of file
diff --git a/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json b/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json
new file mode 100644
index 00000000..77b2fc1f
--- /dev/null
+++ b/aai-core/src/test/resources/payloads/resource/aai-invalid-event.json
@@ -0,0 +1,64 @@
+{
+ "event-topic": "AAI-INVALID-EVENT",
+ "aaiEventPayload": {
+ "cambria.partition": "AAI",
+ "event-header": {
+ "severity": "NORMAL",
+ "entity-type": "object-group",
+ "top-entity-type": "object-group",
+ "entity-link": "/aai/v28/common/object-groups/object-group/ric_cluster",
+ "event-type": "AAI-EVENT",
+ "domain": "dev",
+ "action": "UPDATE",
+ "sequence-number": "0",
+ "id": "23f12123-c326-48a7-b57e-e48746c295ea",
+ "source-name": "postman-api",
+ "version": "v28",
+ "timestamp": "20231207-12:14:44:757"
+ },
+ "entity": {
+ "relationship-list": {
+ "relationship": [
+ {
+ "related-to": "cell",
+ "relationship-data": [
+ {
+ "relationship-value": "445611193273958916",
+ "relationship-key": "cell.cell-id"
+ }
+ ],
+ "related-link": "/aai/v28/network/cells/cell/445611193273958916",
+ "relationship-label": "org.onap.relationships.inventory.MemberOf",
+ "related-to-property": [
+ {
+ "property-key": "cell.cell-name",
+ "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GU2_84079913"
+ }
+ ]
+ },
+ {
+ "related-to": "cell",
+ "relationship-data": [
+ {
+ "relationship-value": "445611193272330241",
+ "relationship-key": "cell.cell-id"
+ }
+ ],
+ "related-link": "/aai/v28/network/cells/cell/445611193272330241",
+ "relationship-label": "org.onap.relationships.inventory.MemberOf",
+ "related-to-property": [
+ {
+ "property-key": "cell.cell-name",
+ "property-value": "MY6885_M-Schwere-Reiter-Str-440460_GTC2_84003803"
+ }
+ ]
+ }
+ ]
+ },
+ "group-name": "Urban",
+ "resource-version": "1701951284582",
+ "group-type": "cell",
+ "object-group-id": "ric_cluster"
+ }
+ }
+} \ No newline at end of file