summaryrefslogtreecommitdiffstats
path: root/aai-core
diff options
context:
space:
mode:
Diffstat (limited to 'aai-core')
-rw-r--r--aai-core/pom.xml2
-rw-r--r--aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java56
-rw-r--r--aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java77
-rw-r--r--aai-core/src/test/resources/payloads/expected/pserver-event.json44
4 files changed, 157 insertions, 22 deletions
diff --git a/aai-core/pom.xml b/aai-core/pom.xml
index d2aed3a0..2f5908b4 100644
--- a/aai-core/pom.xml
+++ b/aai-core/pom.xml
@@ -26,7 +26,7 @@ limitations under the License.
<parent>
<groupId>org.onap.aai.aai-common</groupId>
<artifactId>aai-parent</artifactId>
- <version>1.14.6-SNAPSHOT</version>
+ <version>1.14.7-SNAPSHOT</version>
<relativePath>../aai-parent/pom.xml</relativePath>
</parent>
<artifactId>aai-core</artifactId>
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
index 4e948772..1ed35424 100644
--- a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
+++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java
@@ -20,29 +20,45 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.aai.kafka;
+ package org.onap.aai.kafka;
-import org.json.JSONObject;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.stereotype.Service;
+ import org.json.JSONObject;
+ import org.onap.aai.config.SpringContextAware;
+ import org.onap.aai.domain.notificationEvent.NotificationEvent;
+ import org.onap.aai.util.AAIConfig;
+ import org.springframework.jms.core.JmsTemplate;
+ import org.springframework.stereotype.Service;
-import lombok.RequiredArgsConstructor;
+ import com.fasterxml.jackson.core.JsonProcessingException;
+ import com.fasterxml.jackson.databind.ObjectMapper;
+ import com.fasterxml.jackson.databind.json.JsonMapper;
-@Service
-@RequiredArgsConstructor
-public class AAIKafkaEventJMSProducer implements MessageProducer {
+ import lombok.RequiredArgsConstructor;
+ import lombok.extern.slf4j.Slf4j;
- @Value("${aai.events.enabled:true}") private boolean eventsEnabled;
- private final JmsTemplate jmsTemplate;
+ @Slf4j
+ @Service
+ @RequiredArgsConstructor
+ public class AAIKafkaEventJMSProducer implements MessageProducer {
- public void sendMessageToDefaultDestination(String msg) {
- if (eventsEnabled) {
- jmsTemplate.convertAndSend(msg);
- }
- }
+ private boolean eventsEnabled = "true".equals(AAIConfig.get("aai.jms.enable", "true"));
+ private JmsTemplate jmsTemplate;
+ private static final ObjectMapper mapper = new JsonMapper();
- public void sendMessageToDefaultDestination(JSONObject finalJson) {
- sendMessageToDefaultDestination(finalJson.toString());
- }
-}
+ public AAIKafkaEventJMSProducer(JmsTemplate jmsTemplate) {
+ this.jmsTemplate = jmsTemplate;
+ }
+
+ public void sendMessageToDefaultDestination(String msg) {
+ if (eventsEnabled) {
+ if(jmsTemplate == null) {
+ this.jmsTemplate = SpringContextAware.getBean(JmsTemplate.class);
+ }
+ jmsTemplate.convertAndSend(msg);
+ }
+ }
+
+ public void sendMessageToDefaultDestination(JSONObject finalJson) {
+ sendMessageToDefaultDestination(finalJson.toString());
+ }
+ }
diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java
index 0f5f47ad..56a6a857 100644
--- a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java
+++ b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java
@@ -20,16 +20,38 @@
package org.onap.aai.kafka;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.UriInfo;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mock;
import org.onap.aai.AAISetup;
import org.onap.aai.PayloadUtil;
+import org.onap.aai.exceptions.AAIException;
+import org.onap.aai.introspection.Introspector;
+import org.onap.aai.introspection.Loader;
+import org.onap.aai.parsers.query.QueryParser;
+import org.onap.aai.rest.db.DBRequest;
+import org.onap.aai.restcore.HttpMethod;
+import org.onap.aai.serialization.engines.TransactionalGraphEngine;
+import org.onap.aai.setup.SchemaVersion;
import org.skyscreamer.jsonassert.JSONAssert;
import org.skyscreamer.jsonassert.JSONCompareMode;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,8 +63,10 @@ import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+@Ignore
@Slf4j
@ActiveProfiles("kafka")
@Import(KafkaTestConfiguration.class)
@@ -66,6 +90,18 @@ public class AAIKafkaEventIntegrationTest extends AAISetup {
@Autowired
private ConsumerFactory<String, String> consumerFactory;
+ @Mock UriInfo uriInfoMock;
+ @Mock MultivaluedMap<String, String> queryParamsMock;
+ @Mock HttpHeaders headersMock;
+
+ @Before
+ public void setup() {
+ when(headersMock.getAcceptableMediaTypes()).thenReturn(Collections.singletonList(MediaType.APPLICATION_JSON_TYPE));
+ when(uriInfoMock.getQueryParameters(anyBoolean())).thenReturn(queryParamsMock);
+ when(queryParamsMock.getFirst("depth")).thenReturn("0");
+ when(headersMock.getRequestHeader("aai-request-context")).thenReturn(null);
+ }
+
@Test
public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived()
throws Exception {
@@ -84,4 +120,43 @@ public class AAIKafkaEventIntegrationTest extends AAISetup {
});
}
+ @Test
+ @Ignore
+ // only works when aai.jms.enable=true in aaiconfig.properties
+ public void thatEventsAreBeingCreated() throws AAIException, IOException {
+ Consumer<String, String> consumer = consumerFactory.createConsumer();
+ consumer.subscribe(Collections.singletonList("AAI-EVENT"));
+
+ traversalUriHttpEntry.setHttpEntryProperties(new SchemaVersion("v14"));
+ String pserverUri = "/aai/v14/cloud-infrastructure/pservers/pserver/pserver1";
+ String entity = new String(Files.readAllBytes(Paths.get("src/test/resources/payloads/templates/pserver.json"))).replace("${hostname}", "pserver1");
+ DBRequest dbRequest = createDBRequest(pserverUri, entity);
+ List<DBRequest> dbRequests = new ArrayList<>();
+ dbRequests.add(dbRequest);
+
+ traversalUriHttpEntry.process(dbRequests, "test");
+
+ ConsumerRecords<String, String> consumerRecords = KafkaTestUtils.getRecords(consumer, 100000);
+ assertFalse(consumerRecords.isEmpty());
+ String expectedResponse = PayloadUtil.getExpectedPayload("pserver-event.json");
+
+ consumerRecords.forEach(consumerRecord -> {
+ JSONAssert.assertEquals(expectedResponse, consumerRecord.value(), JSONCompareMode.LENIENT);
+ });
+ }
+
+ @SneakyThrows
+ private DBRequest createDBRequest(String uri, String entity) {
+ TransactionalGraphEngine dbEngine = traversalUriHttpEntry.getDbEngine();
+ Loader loader = traversalUriHttpEntry.getLoader();
+ URI uriObject = new URI(uri);
+ QueryParser uriQuery = dbEngine.getQueryBuilder().createQueryFromURI(uriObject);
+ String objName = uriQuery.getResultType();
+ Introspector obj = loader.unmarshal(objName, entity,
+ org.onap.aai.restcore.MediaType.getEnum("application/json"));
+ return new DBRequest.Builder(HttpMethod.PUT, uriObject, uriQuery, obj, headersMock, uriInfoMock, "someTransaction")
+ .rawRequestContent(entity)
+ .build();
+ }
+
}
diff --git a/aai-core/src/test/resources/payloads/expected/pserver-event.json b/aai-core/src/test/resources/payloads/expected/pserver-event.json
new file mode 100644
index 00000000..11c1a100
--- /dev/null
+++ b/aai-core/src/test/resources/payloads/expected/pserver-event.json
@@ -0,0 +1,44 @@
+{
+ "cambria.partition": "AAI",
+ "event-header": {
+ "severity": "NORMAL",
+ "entity-type": "pserver",
+ "top-entity-type": "pserver",
+ "entity-link": "/aai/v14/cloud-infrastructure/pservers/pserver/pserver1",
+ "event-type": "AAI-EVENT",
+ "domain": "devINT1",
+ "action": "CREATE",
+ "sequence-number": "0",
+ "id": "someTransaction",
+ "source-name": "test",
+ "version": "v14",
+ },
+ "entity": {
+ "ptnii-equip-name": "example-ptnii-equip-name-val-36969",
+ "ipaddress-v6-loopback-0": "example-ipaddress-v6-loopback0-val-17856",
+ "equip-vendor": "example-equip-vendor-val-37452",
+ "purpose": "example-purpose-val-90218",
+ "pserver-selflink": "example-pserver-selflink-val-10125",
+ "number-of-cpus": 62220,
+ "ipaddress-v6-aim": "example-ipaddress-v6-aim-val-6210",
+ "pserver-name2": "example-pserver-name2-val-53802",
+ "hostname": "pserver1",
+ "inv-status": "example-inv-status-val-3682",
+ "disk-in-gigabytes": 872,
+ "equip-type": "example-equip-type-val-22986",
+ "fqdn": "example-fqdn-val-33429",
+ "serial-number": "example-serial-number-val-12010",
+ "ipaddress-v6-oam": "example-ipaddress-v6-oam-val-40977",
+ "pserver-id": "example-pserver-id-val-82142",
+ "prov-status": "example-prov-status-val-11642",
+ "ipv4-oam-address": "example-ipv4-oam-address-val-3155",
+ "ipaddress-v4-loopback-0": "example-ipaddress-v4-loopback0-val-77686",
+ "equip-model": "example-equip-model-val-14665",
+ "in-maint": true,
+ "ram-in-megabytes": 35331,
+ "ipaddress-v4-aim": "example-ipaddress-v4-aim-val-33665",
+ "management-option": "example-management-option-val-91111",
+ "internet-topology": "example-internet-topology-val-56425",
+ "host-profile": "example-host-profile-val-36247"
+ }
+}