diff options
Diffstat (limited to 'aai-core/src')
3 files changed, 156 insertions, 21 deletions
diff --git a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java index 4e948772..1ed35424 100644 --- a/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java +++ b/aai-core/src/main/java/org/onap/aai/kafka/AAIKafkaEventJMSProducer.java @@ -20,29 +20,45 @@ * ============LICENSE_END========================================================= */ -package org.onap.aai.kafka; + package org.onap.aai.kafka; -import org.json.JSONObject; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.stereotype.Service; + import org.json.JSONObject; + import org.onap.aai.config.SpringContextAware; + import org.onap.aai.domain.notificationEvent.NotificationEvent; + import org.onap.aai.util.AAIConfig; + import org.springframework.jms.core.JmsTemplate; + import org.springframework.stereotype.Service; -import lombok.RequiredArgsConstructor; + import com.fasterxml.jackson.core.JsonProcessingException; + import com.fasterxml.jackson.databind.ObjectMapper; + import com.fasterxml.jackson.databind.json.JsonMapper; -@Service -@RequiredArgsConstructor -public class AAIKafkaEventJMSProducer implements MessageProducer { + import lombok.RequiredArgsConstructor; + import lombok.extern.slf4j.Slf4j; - @Value("${aai.events.enabled:true}") private boolean eventsEnabled; - private final JmsTemplate jmsTemplate; + @Slf4j + @Service + @RequiredArgsConstructor + public class AAIKafkaEventJMSProducer implements MessageProducer { - public void sendMessageToDefaultDestination(String msg) { - if (eventsEnabled) { - jmsTemplate.convertAndSend(msg); - } - } + private boolean eventsEnabled = "true".equals(AAIConfig.get("aai.jms.enable", "true")); + private JmsTemplate jmsTemplate; + private static final ObjectMapper mapper = new JsonMapper(); - public void sendMessageToDefaultDestination(JSONObject finalJson) { - sendMessageToDefaultDestination(finalJson.toString()); - } -} + public AAIKafkaEventJMSProducer(JmsTemplate jmsTemplate) { + this.jmsTemplate = jmsTemplate; + } + + public void sendMessageToDefaultDestination(String msg) { + if (eventsEnabled) { + if(jmsTemplate == null) { + this.jmsTemplate = SpringContextAware.getBean(JmsTemplate.class); + } + jmsTemplate.convertAndSend(msg); + } + } + + public void sendMessageToDefaultDestination(JSONObject finalJson) { + sendMessageToDefaultDestination(finalJson.toString()); + } + } diff --git a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java index 0f5f47ad..56a6a857 100644 --- a/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java +++ b/aai-core/src/test/java/org/onap/aai/kafka/AAIKafkaEventIntegrationTest.java @@ -20,16 +20,38 @@ package org.onap.aai.kafka; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.when; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; + +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.UriInfo; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mock; import org.onap.aai.AAISetup; import org.onap.aai.PayloadUtil; +import org.onap.aai.exceptions.AAIException; +import org.onap.aai.introspection.Introspector; +import org.onap.aai.introspection.Loader; +import org.onap.aai.parsers.query.QueryParser; +import org.onap.aai.rest.db.DBRequest; +import org.onap.aai.restcore.HttpMethod; +import org.onap.aai.serialization.engines.TransactionalGraphEngine; +import org.onap.aai.setup.SchemaVersion; import org.skyscreamer.jsonassert.JSONAssert; import org.skyscreamer.jsonassert.JSONCompareMode; import org.springframework.beans.factory.annotation.Autowired; @@ -41,8 +63,10 @@ import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.TestPropertySource; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +@Ignore @Slf4j @ActiveProfiles("kafka") @Import(KafkaTestConfiguration.class) @@ -66,6 +90,18 @@ public class AAIKafkaEventIntegrationTest extends AAISetup { @Autowired private ConsumerFactory<String, String> consumerFactory; + @Mock UriInfo uriInfoMock; + @Mock MultivaluedMap<String, String> queryParamsMock; + @Mock HttpHeaders headersMock; + + @Before + public void setup() { + when(headersMock.getAcceptableMediaTypes()).thenReturn(Collections.singletonList(MediaType.APPLICATION_JSON_TYPE)); + when(uriInfoMock.getQueryParameters(anyBoolean())).thenReturn(queryParamsMock); + when(queryParamsMock.getFirst("depth")).thenReturn("0"); + when(headersMock.getRequestHeader("aai-request-context")).thenReturn(null); + } + @Test public void onMessage_shouldSendMessageToKafkaTopic_whenAAIEventReceived() throws Exception { @@ -84,4 +120,43 @@ public class AAIKafkaEventIntegrationTest extends AAISetup { }); } + @Test + @Ignore + // only works when aai.jms.enable=true in aaiconfig.properties + public void thatEventsAreBeingCreated() throws AAIException, IOException { + Consumer<String, String> consumer = consumerFactory.createConsumer(); + consumer.subscribe(Collections.singletonList("AAI-EVENT")); + + traversalUriHttpEntry.setHttpEntryProperties(new SchemaVersion("v14")); + String pserverUri = "/aai/v14/cloud-infrastructure/pservers/pserver/pserver1"; + String entity = new String(Files.readAllBytes(Paths.get("src/test/resources/payloads/templates/pserver.json"))).replace("${hostname}", "pserver1"); + DBRequest dbRequest = createDBRequest(pserverUri, entity); + List<DBRequest> dbRequests = new ArrayList<>(); + dbRequests.add(dbRequest); + + traversalUriHttpEntry.process(dbRequests, "test"); + + ConsumerRecords<String, String> consumerRecords = KafkaTestUtils.getRecords(consumer, 100000); + assertFalse(consumerRecords.isEmpty()); + String expectedResponse = PayloadUtil.getExpectedPayload("pserver-event.json"); + + consumerRecords.forEach(consumerRecord -> { + JSONAssert.assertEquals(expectedResponse, consumerRecord.value(), JSONCompareMode.LENIENT); + }); + } + + @SneakyThrows + private DBRequest createDBRequest(String uri, String entity) { + TransactionalGraphEngine dbEngine = traversalUriHttpEntry.getDbEngine(); + Loader loader = traversalUriHttpEntry.getLoader(); + URI uriObject = new URI(uri); + QueryParser uriQuery = dbEngine.getQueryBuilder().createQueryFromURI(uriObject); + String objName = uriQuery.getResultType(); + Introspector obj = loader.unmarshal(objName, entity, + org.onap.aai.restcore.MediaType.getEnum("application/json")); + return new DBRequest.Builder(HttpMethod.PUT, uriObject, uriQuery, obj, headersMock, uriInfoMock, "someTransaction") + .rawRequestContent(entity) + .build(); + } + } diff --git a/aai-core/src/test/resources/payloads/expected/pserver-event.json b/aai-core/src/test/resources/payloads/expected/pserver-event.json new file mode 100644 index 00000000..11c1a100 --- /dev/null +++ b/aai-core/src/test/resources/payloads/expected/pserver-event.json @@ -0,0 +1,44 @@ +{ + "cambria.partition": "AAI", + "event-header": { + "severity": "NORMAL", + "entity-type": "pserver", + "top-entity-type": "pserver", + "entity-link": "/aai/v14/cloud-infrastructure/pservers/pserver/pserver1", + "event-type": "AAI-EVENT", + "domain": "devINT1", + "action": "CREATE", + "sequence-number": "0", + "id": "someTransaction", + "source-name": "test", + "version": "v14", + }, + "entity": { + "ptnii-equip-name": "example-ptnii-equip-name-val-36969", + "ipaddress-v6-loopback-0": "example-ipaddress-v6-loopback0-val-17856", + "equip-vendor": "example-equip-vendor-val-37452", + "purpose": "example-purpose-val-90218", + "pserver-selflink": "example-pserver-selflink-val-10125", + "number-of-cpus": 62220, + "ipaddress-v6-aim": "example-ipaddress-v6-aim-val-6210", + "pserver-name2": "example-pserver-name2-val-53802", + "hostname": "pserver1", + "inv-status": "example-inv-status-val-3682", + "disk-in-gigabytes": 872, + "equip-type": "example-equip-type-val-22986", + "fqdn": "example-fqdn-val-33429", + "serial-number": "example-serial-number-val-12010", + "ipaddress-v6-oam": "example-ipaddress-v6-oam-val-40977", + "pserver-id": "example-pserver-id-val-82142", + "prov-status": "example-prov-status-val-11642", + "ipv4-oam-address": "example-ipv4-oam-address-val-3155", + "ipaddress-v4-loopback-0": "example-ipaddress-v4-loopback0-val-77686", + "equip-model": "example-equip-model-val-14665", + "in-maint": true, + "ram-in-megabytes": 35331, + "ipaddress-v4-aim": "example-ipaddress-v4-aim-val-33665", + "management-option": "example-management-option-val-91111", + "internet-topology": "example-internet-topology-val-56425", + "host-profile": "example-host-profile-val-36247" + } +} |