aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/onap/dcae/common/EventSender.java10
-rw-r--r--src/main/java/org/onap/dcae/common/model/VesEvent.java112
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java46
-rw-r--r--src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java154
-rw-r--r--src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java5
5 files changed, 192 insertions, 135 deletions
diff --git a/src/main/java/org/onap/dcae/common/EventSender.java b/src/main/java/org/onap/dcae/common/EventSender.java
index 1ec918c9..81c463dc 100644
--- a/src/main/java/org/onap/dcae/common/EventSender.java
+++ b/src/main/java/org/onap/dcae/common/EventSender.java
@@ -49,17 +49,17 @@ public class EventSender {
setLoggingContext(vesEvent);
streamIdToDmaapIds.get(vesEvent.getStreamId())
.onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + vesEvent.asJsonObject()))
- .forEach(streamIds -> sendEventsToStreams(vesEvent, streamIds));
+ .forEach(dmaapIds -> sendEventsToStreams(vesEvent, dmaapIds));
log.debug("Message published" + vesEvent.asJsonObject());
}
log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
metriclog.info("EVENT_PUBLISH_END");
}
- private void sendEventsToStreams(VesEvent vesEvent, String[] streamIdList) {
- for (String streamId : streamIdList) {
- log.info("Invoking publisher for streamId/domain:" + streamId);
- eventPublisher.sendEvent(vesEvent.asJsonObject(), streamId);
+ private void sendEventsToStreams(VesEvent vesEvent, String[] dmaapIds) {
+ for (String dmaapId : dmaapIds) {
+ log.info("Invoking publisher for streamId/domain:" + dmaapId);
+ eventPublisher.sendEvent(vesEvent, dmaapId);
}
}
diff --git a/src/main/java/org/onap/dcae/common/model/VesEvent.java b/src/main/java/org/onap/dcae/common/model/VesEvent.java
index 6c9a8ee2..8e2db80e 100644
--- a/src/main/java/org/onap/dcae/common/model/VesEvent.java
+++ b/src/main/java/org/onap/dcae/common/model/VesEvent.java
@@ -32,14 +32,15 @@ import org.json.JSONObject;
*/
public class VesEvent {
- private static final String EVENT_LITERAL = "event";
+ public static final String VES_UNIQUE_ID = "VESuniqueId";
private static final String COMMON_EVENT_HEADER = "commonEventHeader";
- private static final String VES_UNIQUE_ID = "VESuniqueId";
private static final String DOMAIN = "domain";
private static final String STND_DEFINED_NAMESPACE = "stndDefinedNamespace";
private static final String STND_DEFINED_DOMAIN = "stndDefined";
private static final String STND_DEFINED_FIELDS = "stndDefinedFields";
private static final String SCHEMA_REFERENCE = "schemaReference";
+ private static final String EVENT = "event";
+ private static final String PARTITION_KEY = "sourceName";
private final JSONObject event;
@@ -47,6 +48,45 @@ public class VesEvent {
this.event = event;
}
+ public JsonNode asJsonNode() throws JsonProcessingException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return objectMapper.readTree(event.toString());
+ }
+
+ /**
+ * Returns VES event in form of JSON object.
+ *
+ * @return event in form of json Object
+ */
+ public JSONObject asJsonObject() {
+ return new JSONObject(event.toString());
+ }
+
+ /**
+ * Returns Domain name from VES event.
+ *
+ * @return domain
+ */
+ public String getDomain() {
+ return getEventHeader().getString(DOMAIN);
+ }
+
+ /**
+ * Returns event primary key.
+ * @return a primary key
+ */
+ public String getPK() {
+ return event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString();
+ }
+
+ /**
+ * Returns schema reference.
+ * @return a schema reference.
+ */
+ public String getSchemaReference() {
+ return getStndDefinedFields().getString(SCHEMA_REFERENCE);
+ }
+
/**
* Returns stream ID from VES event.
*
@@ -63,21 +103,40 @@ public class VesEvent {
}
/**
- * Returns Domain name from VES event.
+ * Returns unique ID of VES event.
*
- * @return domain
+ * @return unique ID
*/
- public String getDomain() {
- return getEventHeader().getString(DOMAIN);
+ public Object getUniqueId() {
+ return event.get(VES_UNIQUE_ID);
}
- public String getSchemaReference() {
- return getStndDefinedFields().getString(SCHEMA_REFERENCE);
+ /**
+ * Checks if type of event is same as given in paramaters.
+ *
+ * @param type name that will be compared with event type
+ * @return true or false depending if type given in parameter is same as VES event type
+ */
+ public boolean hasType(String type) {
+ return this.event.has(type);
+ }
+
+ /**
+ * Remove Json element from event by key.
+ * @param key
+ */
+ public void removeElement(String key) {
+ this.event.remove(key);
+ }
+
+ @Override
+ public String toString() {
+ return event.toString();
}
private JSONObject getStndDefinedFields() {
return event
- .getJSONObject(EVENT_LITERAL)
+ .getJSONObject(EVENT)
.getJSONObject(STND_DEFINED_FIELDS);
}
@@ -97,44 +156,11 @@ public class VesEvent {
private JSONObject getEventHeader() {
return event
- .getJSONObject(EVENT_LITERAL)
+ .getJSONObject(EVENT)
.getJSONObject(COMMON_EVENT_HEADER);
}
private boolean isStdDefinedDomain(String domain) {
return domain.equals(STND_DEFINED_DOMAIN);
}
-
- /**
- * Returns unique ID of VES event.
- *
- * @return unique ID
- */
- public Object getUniqueId() {
- return event.get(VES_UNIQUE_ID);
- }
-
- /**
- * Returns VES event in form of JSON object.
- *
- * @return event in form of json Object
- */
- public JSONObject asJsonObject() {
- return new JSONObject(event.toString());
- }
-
- public JsonNode asJsonNode() throws JsonProcessingException {
- ObjectMapper objectMapper = new ObjectMapper();
- return objectMapper.readTree(event.toString());
- }
-
- /**
- * Checks if type of event is same as given in paramaters.
- *
- * @param type name that will be compared with event type
- * @return true or false depending if type given in parameter is same as VES event type
- */
- public boolean hasType(String type) {
- return this.event.has(type);
- }
}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
index 4e9aabc7..876c391b 100644
--- a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
@@ -27,8 +27,8 @@ import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.log4j.EcompFields;
import io.vavr.collection.Map;
import io.vavr.control.Try;
-import org.json.JSONObject;
import org.onap.dcae.common.VESLogger;
+import org.onap.dcae.common.model.VesEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,10 +41,6 @@ import static org.onap.dcae.common.publishing.VavrUtils.f;
*/
public class DMaaPEventPublisher {
private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
- private static final String VES_UNIQUE_ID = "VESuniqueId";
- private static final String EVENT = "event";
- private static final String COMMON_EVENT_HEADER = "commonEventHeader";
- private static final String PARTITION_KEY = "sourceName";
private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
private final DMaaPPublishersCache publishersCache;
private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");
@@ -57,45 +53,45 @@ public class DMaaPEventPublisher {
this(new DMaaPPublishersCache(dMaaPConfig));
}
- public void sendEvent(JSONObject event, String domain) {
- clearVesUniqueIdFromEvent(event);
- publishersCache.getPublisher(domain)
+ public void sendEvent(VesEvent vesEvent, String dmaapId){
+ clearVesUniqueIdFromEvent(vesEvent);
+ publishersCache.getPublisher(dmaapId)
.onEmpty(() ->
- log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
- .forEach(publisher -> sendEvent(event, domain, publisher));
+ log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent)))
+ .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher));
}
- private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) {
- Try.run(() -> uncheckedSendEvent(event, domain, publisher))
- .onFailure(exc -> closePublisher(event, domain, exc));
+ private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) {
+ Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher))
+ .onFailure(exc -> closePublisher(event, dmaapId, exc));
}
- private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
+ private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher)
throws IOException {
-
- String pk = event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString();
- int pendingMsgs = publisher.send(pk, event.toString());
+
+ String pk = event.getPK();
+ int pendingMsgs = publisher.send(pk, event.asJsonObject().toString());
if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
log.info("Pending messages count: " + pendingMsgs);
}
- String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain);
+ String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId);
log.info(infoMsg);
outputLogger.info(infoMsg);
}
- private void closePublisher(JSONObject event, String domain, Throwable e) {
+ private void closePublisher(VesEvent event, String dmaapId, Throwable e) {
log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
- event, domain), e);
- publishersCache.closePublisherFor(domain);
+ event, dmaapId), e);
+ publishersCache.closePublisherFor(dmaapId);
}
- private void clearVesUniqueIdFromEvent(JSONObject event) {
- if (event.has(VES_UNIQUE_ID)) {
- String uuid = event.get(VES_UNIQUE_ID).toString();
+ private void clearVesUniqueIdFromEvent(VesEvent event) {
+ if (event.hasType(VesEvent.VES_UNIQUE_ID)) {
+ String uuid = event.getUniqueId().toString();
LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
log.debug("Removing VESuniqueid object from event");
- event.remove(VES_UNIQUE_ID);
+ event.removeElement(VesEvent.VES_UNIQUE_ID);
}
}
}
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
index 99505bfd..e4b6fd91 100644
--- a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
+++ b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
@@ -20,6 +20,14 @@
*/
package org.onap.dcae.common.publishing;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import org.json.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.dcae.common.model.VesEvent;
+
+import java.io.IOException;
+
import static io.vavr.API.Option;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
@@ -27,66 +35,92 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import java.io.IOException;
-import org.json.JSONObject;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-
public class DMaaPEventPublisherTest {
- private static final String STREAM_ID = "sampleStreamId";
-
- private DMaaPEventPublisher eventPublisher;
- private CambriaBatchingPublisher cambriaPublisher;
- private DMaaPPublishersCache DMaaPPublishersCache;
-
- @Before
- public void setUp() {
- cambriaPublisher = mock(CambriaBatchingPublisher.class);
- DMaaPPublishersCache = mock(DMaaPPublishersCache.class);
- when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher));
- eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache);
- }
-
- @Test
- public void shouldSendEventToTopic() throws Exception {
- // given
- JSONObject event = new JSONObject("{\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}");
-
-
- // when
- eventPublisher.sendEvent(event, STREAM_ID);
-
- // then
- verify(cambriaPublisher).send("dns01cmd004", event.toString());
- }
-
-
- @Test
- public void shouldRemoveInternalVESUIDBeforeSending() throws Exception {
- // given
- JSONObject event = new JSONObject(
- "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\",\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}");
-
- // when
- eventPublisher.sendEvent(event, STREAM_ID);
-
- // then
- verify(cambriaPublisher).send("dns01cmd004", new JSONObject("{\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}").toString());
- }
-
- @Test
- public void shouldCloseConnectionWhenExceptionOccurred() throws Exception {
- // given
- JSONObject event = new JSONObject("{}");
- given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail"));
-
- // when
- eventPublisher.sendEvent(event, STREAM_ID);
-
- // then
- verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID);
- }
+ private static final String STREAM_ID = "sampleStreamId";
+
+ private static final JSONObject EXPECTED_EVENT =
+ new JSONObject(
+ "{\"VESversion\":\"v7\",\"event\":{"
+ + "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,"
+ + "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\","
+ + "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\","
+ + "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\","
+ + "\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,"
+ + "\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\","
+ + "\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\","
+ + "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}");
+
+ private static final String PARTITION = "dns01cmd004";
+
+ private DMaaPEventPublisher eventPublisher;
+ private CambriaBatchingPublisher cambriaPublisher;
+ private DMaaPPublishersCache DMaaPPublishersCache;
+
+ @Before
+ public void setUp() {
+ cambriaPublisher = mock(CambriaBatchingPublisher.class);
+ DMaaPPublishersCache = mock(DMaaPPublishersCache.class);
+ when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher));
+ eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache);
+ }
+
+ @Test
+ public void shouldSendEventToTopic() throws Exception {
+ // when
+ eventPublisher.sendEvent(givenVesEventWithoutVESuniqueIdField(), STREAM_ID);
+
+ // then
+ verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString());
+ }
+
+ @Test
+ public void shouldRemoveInternalVESUIDBeforeSending() throws Exception {
+ // when
+ eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID);
+
+ // then
+ verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString());
+ }
+
+ @Test
+ public void shouldCloseConnectionWhenExceptionOccurred() throws Exception {
+ // given
+ given(cambriaPublisher.send(anyString(), anyString()))
+ .willThrow(new IOException("Expected exception - test case scenario!"));
+
+ // when
+ eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID);
+
+ // then
+ verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID);
+ }
+
+ private VesEvent givenVesEventWithVESUniqueIdField() {
+ return new VesEvent(
+ new JSONObject(
+ "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\"," +
+ "\"event\":{" +
+ "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," +
+ "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," +
+ "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," +
+ "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," +
+ "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," +
+ "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," +
+ "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"));
+ }
+
+ private VesEvent givenVesEventWithoutVESuniqueIdField() {
+ return new VesEvent(
+ new JSONObject(
+ "{\"VESversion\":\"v7\"," +
+ "\"event\":{" +
+ "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," +
+ "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," +
+ "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," +
+ "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," +
+ "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," +
+ "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," +
+ "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"));
+ }
}
diff --git a/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java b/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java
index ce7e09d3..5504ca8e 100644
--- a/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java
+++ b/src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java
@@ -40,6 +40,7 @@ import org.onap.dcae.common.EventSender;
import org.onap.dcae.common.EventTransformation;
import org.onap.dcae.common.HeaderUtils;
import org.onap.dcae.common.JsonDataLoader;
+import org.onap.dcae.common.model.VesEvent;
import org.onap.dcae.common.validator.StndDefinedDataValidator;
import org.onap.dcae.common.publishing.DMaaPEventPublisher;
import org.slf4j.Logger;
@@ -323,11 +324,11 @@ public class VesRestControllerTest {
assertThat(eventBeforeTransformation).contains("\"version\": \"4.0.1\"");
assertThat(eventBeforeTransformation).contains("\"faultFieldsVersion\": \"4.0\"");
- ArgumentCaptor<JSONObject> argument = ArgumentCaptor.forClass(JSONObject.class);
+ ArgumentCaptor<VesEvent> argument = ArgumentCaptor.forClass(VesEvent.class);
ArgumentCaptor<String> domain = ArgumentCaptor.forClass(String.class);
verify(eventPublisher).sendEvent(argument.capture(), domain.capture());
- final String transformedEvent = argument.getValue().toString();
+ final String transformedEvent = argument.getValue().asJsonObject().toString();
final String eventSentAtTopic = domain.getValue();
// event after transformation