diff options
Diffstat (limited to 'src')
5 files changed, 192 insertions, 135 deletions
diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java index 1ec918c9..81c463dc 100644 --- a/src/main/java/org/onap/dcae/common/EventSender.java +++ b/src/main/java/org/onap/dcae/common/EventSender.java @@ -49,17 +49,17 @@ public class EventSender { setLoggingContext(vesEvent); streamIdToDmaapIds.get(vesEvent.getStreamId()) .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + vesEvent.asJsonObject())) - .forEach(streamIds -> sendEventsToStreams(vesEvent, streamIds)); + .forEach(dmaapIds -> sendEventsToStreams(vesEvent, dmaapIds)); log.debug("Message published" + vesEvent.asJsonObject()); } log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!"); metriclog.info("EVENT_PUBLISH_END"); } - private void sendEventsToStreams(VesEvent vesEvent, String[] streamIdList) { - for (String streamId : streamIdList) { - log.info("Invoking publisher for streamId/domain:" + streamId); - eventPublisher.sendEvent(vesEvent.asJsonObject(), streamId); + private void sendEventsToStreams(VesEvent vesEvent, String[] dmaapIds) { + for (String dmaapId : dmaapIds) { + log.info("Invoking publisher for streamId/domain:" + dmaapId); + eventPublisher.sendEvent(vesEvent, dmaapId); } } diff --git a/src/main/java/org/onap/dcae/common/model/VesEvent.java b/src/main/java/org/onap/dcae/common/model/VesEvent.java index 6c9a8ee2..8e2db80e 100644 --- a/src/main/java/org/onap/dcae/common/model/VesEvent.java +++ b/src/main/java/org/onap/dcae/common/model/VesEvent.java @@ -32,14 +32,15 @@ import org.json.JSONObject; */ public class VesEvent { - private static final String EVENT_LITERAL = "event"; + public static final String VES_UNIQUE_ID = "VESuniqueId"; private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private static final String VES_UNIQUE_ID = "VESuniqueId"; private static final String DOMAIN = "domain"; private static final String STND_DEFINED_NAMESPACE = "stndDefinedNamespace"; private static final String STND_DEFINED_DOMAIN = "stndDefined"; private static final String STND_DEFINED_FIELDS = "stndDefinedFields"; private static final String SCHEMA_REFERENCE = "schemaReference"; + private static final String EVENT = "event"; + private static final String PARTITION_KEY = "sourceName"; private final JSONObject event; @@ -47,6 +48,45 @@ public class VesEvent { this.event = event; } + public JsonNode asJsonNode() throws JsonProcessingException { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readTree(event.toString()); + } + + /** + * Returns VES event in form of JSON object. + * + * @return event in form of json Object + */ + public JSONObject asJsonObject() { + return new JSONObject(event.toString()); + } + + /** + * Returns Domain name from VES event. + * + * @return domain + */ + public String getDomain() { + return getEventHeader().getString(DOMAIN); + } + + /** + * Returns event primary key. + * @return a primary key + */ + public String getPK() { + return event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString(); + } + + /** + * Returns schema reference. + * @return a schema reference. + */ + public String getSchemaReference() { + return getStndDefinedFields().getString(SCHEMA_REFERENCE); + } + /** * Returns stream ID from VES event. * @@ -63,21 +103,40 @@ public class VesEvent { } /** - * Returns Domain name from VES event. + * Returns unique ID of VES event. * - * @return domain + * @return unique ID */ - public String getDomain() { - return getEventHeader().getString(DOMAIN); + public Object getUniqueId() { + return event.get(VES_UNIQUE_ID); } - public String getSchemaReference() { - return getStndDefinedFields().getString(SCHEMA_REFERENCE); + /** + * Checks if type of event is same as given in paramaters. + * + * @param type name that will be compared with event type + * @return true or false depending if type given in parameter is same as VES event type + */ + public boolean hasType(String type) { + return this.event.has(type); + } + + /** + * Remove Json element from event by key. + * @param key + */ + public void removeElement(String key) { + this.event.remove(key); + } + + @Override + public String toString() { + return event.toString(); } private JSONObject getStndDefinedFields() { return event - .getJSONObject(EVENT_LITERAL) + .getJSONObject(EVENT) .getJSONObject(STND_DEFINED_FIELDS); } @@ -97,44 +156,11 @@ public class VesEvent { private JSONObject getEventHeader() { return event - .getJSONObject(EVENT_LITERAL) + .getJSONObject(EVENT) .getJSONObject(COMMON_EVENT_HEADER); } private boolean isStdDefinedDomain(String domain) { return domain.equals(STND_DEFINED_DOMAIN); } - - /** - * Returns unique ID of VES event. - * - * @return unique ID - */ - public Object getUniqueId() { - return event.get(VES_UNIQUE_ID); - } - - /** - * Returns VES event in form of JSON object. - * - * @return event in form of json Object - */ - public JSONObject asJsonObject() { - return new JSONObject(event.toString()); - } - - public JsonNode asJsonNode() throws JsonProcessingException { - ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readTree(event.toString()); - } - - /** - * Checks if type of event is same as given in paramaters. - * - * @param type name that will be compared with event type - * @return true or false depending if type given in parameter is same as VES event type - */ - public boolean hasType(String type) { - return this.event.has(type); - } } diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java index 4e9aabc7..876c391b 100644 --- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java +++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java @@ -27,8 +27,8 @@ import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; import io.vavr.collection.Map; import io.vavr.control.Try; -import org.json.JSONObject; import org.onap.dcae.common.VESLogger; +import org.onap.dcae.common.model.VesEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,10 +41,6 @@ import static org.onap.dcae.common.publishing.VavrUtils.f; */ public class DMaaPEventPublisher { private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; - private static final String VES_UNIQUE_ID = "VESuniqueId"; - private static final String EVENT = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private static final String PARTITION_KEY = "sourceName"; private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); private final DMaaPPublishersCache publishersCache; private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output"); @@ -57,45 +53,45 @@ public class DMaaPEventPublisher { this(new DMaaPPublishersCache(dMaaPConfig)); } - public void sendEvent(JSONObject event, String domain) { - clearVesUniqueIdFromEvent(event); - publishersCache.getPublisher(domain) + public void sendEvent(VesEvent vesEvent, String dmaapId){ + clearVesUniqueIdFromEvent(vesEvent); + publishersCache.getPublisher(dmaapId) .onEmpty(() -> - log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) - .forEach(publisher -> sendEvent(event, domain, publisher)); + log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent))) + .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher)); } - private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { - Try.run(() -> uncheckedSendEvent(event, domain, publisher)) - .onFailure(exc -> closePublisher(event, domain, exc)); + private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) { + Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher)) + .onFailure(exc -> closePublisher(event, dmaapId, exc)); } - private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) + private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) throws IOException { - - String pk = event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString(); - int pendingMsgs = publisher.send(pk, event.toString()); + + String pk = event.getPK(); + int pendingMsgs = publisher.send(pk, event.asJsonObject().toString()); if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { log.info("Pending messages count: " + pendingMsgs); } - String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain); + String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId); log.info(infoMsg); outputLogger.info(infoMsg); } - private void closePublisher(JSONObject event, String domain, Throwable e) { + private void closePublisher(VesEvent event, String dmaapId, Throwable e) { log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", - event, domain), e); - publishersCache.closePublisherFor(domain); + event, dmaapId), e); + publishersCache.closePublisherFor(dmaapId); } - private void clearVesUniqueIdFromEvent(JSONObject event) { - if (event.has(VES_UNIQUE_ID)) { - String uuid = event.get(VES_UNIQUE_ID).toString(); + private void clearVesUniqueIdFromEvent(VesEvent event) { + if (event.hasType(VesEvent.VES_UNIQUE_ID)) { + String uuid = event.getUniqueId().toString(); LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); log.debug("Removing VESuniqueid object from event"); - event.remove(VES_UNIQUE_ID); + event.removeElement(VesEvent.VES_UNIQUE_ID); } } } diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java index 99505bfd..e4b6fd91 100644 --- a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java +++ b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java @@ -20,6 +20,14 @@ */ package org.onap.dcae.common.publishing; +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.common.model.VesEvent; + +import java.io.IOException; + import static io.vavr.API.Option; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.BDDMockito.given; @@ -27,66 +35,92 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import java.io.IOException; -import org.json.JSONObject; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; - public class DMaaPEventPublisherTest { - private static final String STREAM_ID = "sampleStreamId"; - - private DMaaPEventPublisher eventPublisher; - private CambriaBatchingPublisher cambriaPublisher; - private DMaaPPublishersCache DMaaPPublishersCache; - - @Before - public void setUp() { - cambriaPublisher = mock(CambriaBatchingPublisher.class); - DMaaPPublishersCache = mock(DMaaPPublishersCache.class); - when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher)); - eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache); - } - - @Test - public void shouldSendEventToTopic() throws Exception { - // given - JSONObject event = new JSONObject("{\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"); - - - // when - eventPublisher.sendEvent(event, STREAM_ID); - - // then - verify(cambriaPublisher).send("dns01cmd004", event.toString()); - } - - - @Test - public void shouldRemoveInternalVESUIDBeforeSending() throws Exception { - // given - JSONObject event = new JSONObject( - "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\",\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"); - - // when - eventPublisher.sendEvent(event, STREAM_ID); - - // then - verify(cambriaPublisher).send("dns01cmd004", new JSONObject("{\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}").toString()); - } - - @Test - public void shouldCloseConnectionWhenExceptionOccurred() throws Exception { - // given - JSONObject event = new JSONObject("{}"); - given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail")); - - // when - eventPublisher.sendEvent(event, STREAM_ID); - - // then - verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID); - } + private static final String STREAM_ID = "sampleStreamId"; + + private static final JSONObject EXPECTED_EVENT = + new JSONObject( + "{\"VESversion\":\"v7\",\"event\":{" + + "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," + + "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," + + "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," + + "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," + + "\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3," + + "\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\"," + + "\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," + + "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"); + + private static final String PARTITION = "dns01cmd004"; + + private DMaaPEventPublisher eventPublisher; + private CambriaBatchingPublisher cambriaPublisher; + private DMaaPPublishersCache DMaaPPublishersCache; + + @Before + public void setUp() { + cambriaPublisher = mock(CambriaBatchingPublisher.class); + DMaaPPublishersCache = mock(DMaaPPublishersCache.class); + when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher)); + eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache); + } + + @Test + public void shouldSendEventToTopic() throws Exception { + // when + eventPublisher.sendEvent(givenVesEventWithoutVESuniqueIdField(), STREAM_ID); + + // then + verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString()); + } + + @Test + public void shouldRemoveInternalVESUIDBeforeSending() throws Exception { + // when + eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID); + + // then + verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString()); + } + + @Test + public void shouldCloseConnectionWhenExceptionOccurred() throws Exception { + // given + given(cambriaPublisher.send(anyString(), anyString())) + .willThrow(new IOException("Expected exception - test case scenario!")); + + // when + eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID); + + // then + verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID); + } + + private VesEvent givenVesEventWithVESUniqueIdField() { + return new VesEvent( + new JSONObject( + "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\"," + + "\"event\":{" + + "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," + + "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," + + "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," + + "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," + + "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," + + "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," + + "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}")); + } + + private VesEvent givenVesEventWithoutVESuniqueIdField() { + return new VesEvent( + new JSONObject( + "{\"VESversion\":\"v7\"," + + "\"event\":{" + + "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," + + "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," + + "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," + + "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," + + "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," + + "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," + + "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}")); + } } diff --git a/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java b/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java index ce7e09d3..5504ca8e 100644 --- a/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java +++ b/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java @@ -40,6 +40,7 @@ import org.onap.dcae.common.EventSender; import org.onap.dcae.common.EventTransformation; import org.onap.dcae.common.HeaderUtils; import org.onap.dcae.common.JsonDataLoader; +import org.onap.dcae.common.model.VesEvent; import org.onap.dcae.common.validator.StndDefinedDataValidator; import org.onap.dcae.common.publishing.DMaaPEventPublisher; import org.slf4j.Logger; @@ -323,11 +324,11 @@ public class VesRestControllerTest { assertThat(eventBeforeTransformation).contains("\"version\": \"4.0.1\""); assertThat(eventBeforeTransformation).contains("\"faultFieldsVersion\": \"4.0\""); - ArgumentCaptor<JSONObject> argument = ArgumentCaptor.forClass(JSONObject.class); + ArgumentCaptor<VesEvent> argument = ArgumentCaptor.forClass(VesEvent.class); ArgumentCaptor<String> domain = ArgumentCaptor.forClass(String.class); verify(eventPublisher).sendEvent(argument.capture(), domain.capture()); - final String transformedEvent = argument.getValue().toString(); + final String transformedEvent = argument.getValue().asJsonObject().toString(); final String eventSentAtTopic = domain.getValue(); // event after transformation |