aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml8
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java69
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java96
-rw-r--r--src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java213
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java143
-rw-r--r--src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java90
-rw-r--r--src/test/java/org/onap/dcae/vestest/TestEventProcessor.java64
-rw-r--r--src/test/resources/test_collector_ip_op.properties156
8 files changed, 577 insertions, 262 deletions
diff --git a/pom.xml b/pom.xml
index 04a0d3a9..9540d409 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,6 +1,7 @@
<!--
================================================================================
Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+Copyright (c) 2018 Nokia. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -137,6 +138,13 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property.
<version>1.10</version>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.18.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
diff --git a/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java b/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java
new file mode 100644
index 00000000..35d3e317
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java
@@ -0,0 +1,69 @@
+package org.onap.dcae.commonFunction;
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CambriaPublisherFactory {
+
+ private static Logger log = LoggerFactory.getLogger(CambriaPublisherFactory.class);
+
+ public CambriaBatchingPublisher createCambriaPublisher(String streamId)
+ throws MalformedURLException, GeneralSecurityException {
+ String authpwd = null;
+ String ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
+ .get(streamId + ".cambria.url");
+
+ if (ueburl == null) {
+ ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
+ .get(streamId + ".cambria.hosts");
+ }
+ String topic = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile)
+ .getKeyValue(streamId + ".cambria.topic");
+ String authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile)
+ .getKeyValue(streamId + ".basicAuthUsername");
+
+ if (authuser != null) {
+ authpwd = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
+ .get(streamId + ".basicAuthPassword");
+ }
+
+ if ((authuser != null) && (authpwd != null)) {
+ log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic, authuser, authpwd));
+ return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic).usingHttps()
+ .authenticatedByHttp(authuser, authpwd).logSendFailuresAfter(5)
+ // .logTo(log)
+ // .limitBatch(100, 10)
+ .build();
+ } else {
+ log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic));
+ return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic)
+ // .logTo(log)
+ .logSendFailuresAfter(5)
+ // .limitBatch(100, 10)
+ .build();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java
new file mode 100644
index 00000000..c86a3742
--- /dev/null
+++ b/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java
@@ -0,0 +1,96 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class DmaapPublishers {
+
+ private static Logger log = LoggerFactory.getLogger(DmaapPublishers.class);
+ private final LoadingCache<String, CambriaBatchingPublisher> publishers;
+
+ private DmaapPublishers(
+ LoadingCache<String, CambriaBatchingPublisher> publishers) {
+ this.publishers = publishers;
+ }
+
+ static DmaapPublishers create() {
+ return create(new CambriaPublisherFactory());
+ }
+
+ static DmaapPublishers create(final CambriaPublisherFactory publisherFactory) {
+ final LoadingCache<String, CambriaBatchingPublisher> cache = CacheBuilder.<String, CambriaBatchingPublisher>newBuilder()
+// .expireAfterAccess(10, TimeUnit.MINUTES)
+ .removalListener(new RemovalListener<String, CambriaBatchingPublisher>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, CambriaBatchingPublisher> notification) {
+ if (notification.getValue() != null) {
+ onCacheItemInvalidated(notification.getValue());
+ }
+ }
+ })
+ .build(new CacheLoader<String, CambriaBatchingPublisher>() {
+ @Override
+ public CambriaBatchingPublisher load(String streamId)
+ throws MalformedURLException, GeneralSecurityException {
+ try {
+ return publisherFactory.createCambriaPublisher(streamId);
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ log.error("CambriaClientBuilders connection reader exception : streamID - " + streamId + " "
+ + e.getMessage());
+ throw e;
+ }
+ }
+ });
+ return new DmaapPublishers(cache);
+ }
+
+ public CambriaBatchingPublisher getByStreamId(String streamId) {
+ return publishers.getUnchecked(streamId);
+ }
+
+ public void closeByStreamId(String streamId) {
+ publishers.invalidate(streamId);
+ }
+
+ private static void onCacheItemInvalidated(CambriaBatchingPublisher pub) {
+ try {
+ final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
+ if (!stuck.isEmpty()) {
+ log.error(stuck.size() + " messages unsent");
+ }
+ } catch (InterruptedException | IOException e) {
+ log.error("Caught Exception on Close event: {}", e);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java
index 49221418..474424a7 100644
--- a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java
+++ b/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java
@@ -1,15 +1,16 @@
/*-
* ============LICENSE_START=======================================================
- * PROJECT
+ * org.onap.dcaegen2.collectors.ves
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,159 +22,73 @@
package org.onap.dcae.commonFunction;
import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.clock.SaClock;
import com.att.nsa.logging.LoggingContext;
import com.att.nsa.logging.log4j.EcompFields;
-import com.google.gson.JsonArray;
-
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
public class EventPublisherHash {
- private static final String VES_UNIQUE_ID = "VESuniqueId";
- private static EventPublisherHash instance;
- private CambriaBatchingPublisher pub;
-
- private String streamid = "";
- private String ueburl = "";
- private String topic = "";
- private String authuser = "";
- private String authpwd = "";
-
- private static Logger log = LoggerFactory.getLogger(EventPublisherHash.class);
-
- protected static HashMap<String, CambriaBatchingPublisher> map = new HashMap<String, CambriaBatchingPublisher>();
-
-
-
- public CambriaBatchingPublisher Dmaaphash(String newstreamid) {
- pub = null;
- streamid = newstreamid;
- if (map != null && map.containsKey(streamid)){
- pub = map.get(streamid);
-
- }
- else
- {
-
- try {
- ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
- .get(streamid + ".cambria.url");
-
- if (ueburl == null) {
- ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
- .get(streamid + ".cambria.hosts");
- }
- topic = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile)
- .getKeyValue(streamid + ".cambria.topic");
- authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile)
- .getKeyValue(streamid + ".basicAuthUsername");
-
- if (authuser != null) {
- authpwd = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
- .get(streamid + ".basicAuthPassword");
- }
-
-
- if ((authuser != null) && (authpwd != null)) {
- log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic, authuser, authpwd));
- pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic).usingHttps()
- .authenticatedByHttp(authuser, authpwd).logSendFailuresAfter(5)
- // .logTo(log)
- // .limitBatch(100, 10)
- .build();
- } else {
-
- log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic));
- pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic)
- // .logTo(log)
- .logSendFailuresAfter(5)
- // .limitBatch(100, 10)
- .build();
-
- }
-
- map.put(streamid, pub);
-
-
- } catch ( Exception e) {
- log.error("CambriaClientBuilders connection reader exception : streamID - " + newstreamid + " " + e.getMessage());
- }
-
- }
- return pub;
-
- }
-
- /**
- * Returns event publisher
- * @return event publisher
- */
- public static synchronized EventPublisherHash getInstance() {
- if (instance == null) {
- instance = new EventPublisherHash();
- }
- return instance;
-
- }
-
-
- public synchronized void sendEvent(JSONObject event, String streamid) {
-
- log.debug("EventPublisher.sendEvent: instance for publish is ready");
-
- if (event.has(VES_UNIQUE_ID)) {
- String uuid = event.get(VES_UNIQUE_ID).toString();
- LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
- localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
- log.debug("Removing VESuniqueid object from event");
- event.remove(VES_UNIQUE_ID);
- }
-
- try {
-
-
- int pendingMsgs = Dmaaphash(streamid).send("MyPartitionKey", event.toString());
- // this.wait(2000);
-
- if (pendingMsgs > 100) {
- log.info("Pending Message Count=" + pendingMsgs);
- }
-
- log.info("pub.send invoked - no error");
- //CommonStartup.oplog.info(String.format("URL:%sTOPIC:%sEvent Published:%s", ueburl, topic, event));
- CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event));
- } catch (IOException | IllegalArgumentException e) {
- log.error("Unable to publish event: {} streamid: {}. Exception: {}", event, streamid, e);
- Dmaaphash(streamid).close();
- map.remove(streamid);
- }
-
- }
-
- public synchronized void closePublisher() {
-
- try {
- if (pub != null) {
-
- final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
- if (!stuck.isEmpty()) {
- log.error(stuck.size() + " messages unsent");
- }
- }
- } catch (InterruptedException | IOException e) {
- log.error("Caught Exception on Close event: {}", e);
- }
-
- }
+ private static final String VES_UNIQUE_ID = "VESuniqueId";
+ private static final Logger log = LoggerFactory.getLogger(EventPublisherHash.class);
+ private static volatile EventPublisherHash instance = new EventPublisherHash(DmaapPublishers.create());
+ private final DmaapPublishers dmaapPublishers;
+
+ /**
+ * Returns event publisher
+ *
+ * @return event publisher
+ */
+ public static EventPublisherHash getInstance() {
+ return instance;
+ }
+
+ @VisibleForTesting
+ EventPublisherHash(DmaapPublishers dmaapPublishers) {
+ this.dmaapPublishers = dmaapPublishers;
+ }
+
+ public void sendEvent(JSONObject event, String streamid) {
+ log.debug("EventPublisher.sendEvent: instance for publish is ready");
+ clearVesUniqueId(event);
+
+ try {
+ sendEventUsingCachedPublisher(streamid, event);
+ } catch (IOException | IllegalArgumentException e) {
+ log.error("Unable to publish event: {} streamid: {}. Exception: {}", event, streamid, e);
+ dmaapPublishers.closeByStreamId(streamid);
+ }
+ }
+
+ private void clearVesUniqueId(JSONObject event) {
+ if (event.has(VES_UNIQUE_ID)) {
+ String uuid = event.get(VES_UNIQUE_ID).toString();
+ LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+ localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+ log.debug("Removing VESuniqueid object from event");
+ event.remove(VES_UNIQUE_ID);
+ }
+ }
+
+ private void sendEventUsingCachedPublisher(String streamid, JSONObject event) throws IOException {
+ int pendingMsgs = dmaapPublishers.getByStreamId(streamid).send("MyPartitionKey", event.toString());
+ // this.wait(2000);
+
+ if (pendingMsgs > 100) {
+ log.info("Pending Message Count=" + pendingMsgs);
+ }
+
+ log.info("pub.send invoked - no error");
+ //CommonStartup.oplog.info(String.format("URL:%sTOPIC:%sEvent Published:%s", ueburl, topic, event));
+ CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event));
+ }
+
+ @VisibleForTesting
+ public CambriaBatchingPublisher getDmaapPublisher(String streamId) {
+ return dmaapPublishers.getByStreamId(streamId);
+ }
}
-
diff --git a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java
new file mode 100644
index 00000000..782b7c25
--- /dev/null
+++ b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java
@@ -0,0 +1,143 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaPublisher;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.concurrent.TimeUnit;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class DmaapPublishersTest {
+
+ @Mock
+ private CambriaPublisherFactory publisherFactory;
+ @Mock
+ private CambriaBatchingPublisher cambriaPublisher;
+ private DmaapPublishers cut;
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void setUp() throws MalformedURLException, GeneralSecurityException {
+ given(publisherFactory.createCambriaPublisher(anyString())).willReturn(cambriaPublisher);
+ cut = DmaapPublishers.create(publisherFactory);
+ }
+
+ @Test
+ public void getByStreamIdShouldUseCachedItem() throws IOException, GeneralSecurityException {
+ // given
+ String streamId = "sampleStream";
+
+ // when
+ CambriaBatchingPublisher firstPublisher = cut.getByStreamId(streamId);
+ CambriaBatchingPublisher secondPublisher = cut.getByStreamId(streamId);
+
+ // then
+ verify(publisherFactory, times(1)).createCambriaPublisher(streamId);
+ assertSame("should return same instance", firstPublisher, secondPublisher);
+ }
+
+ @Test
+ public void getByStreamIdShouldHandleErrors() throws MalformedURLException, GeneralSecurityException {
+ // given
+ MalformedURLException exception = new MalformedURLException();
+ given(publisherFactory.createCambriaPublisher(anyString())).willThrow(exception);
+ expectedException.expect(allOf(
+ instanceOf(UncheckedExecutionException.class),
+ causeIsInstanceOf(exception.getClass())));
+
+ // when
+ cut.getByStreamId("a stream");
+
+ // then
+ // exception should have been thrown
+ }
+
+ @Test
+ public void closeByStreamIdShouldCloseConnection() throws IOException, InterruptedException {
+ // given
+ String streamId = "sampleStream";
+ given(cambriaPublisher.close(anyLong(), any(TimeUnit.class)))
+ .willReturn(ImmutableList.of(new CambriaPublisher.message("p", "msg")));
+
+ // when
+ CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId);
+ cut.closeByStreamId(streamId);
+
+ // then
+ assertSame("should return proper publisher", cambriaPublisher, cachedPublisher);
+ verify(cambriaPublisher).close(20, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void closeByStreamIdShouldHandleErrors() throws IOException, InterruptedException {
+ // given
+ String streamId = "sampleStream";
+ given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))).willThrow(IOException.class);
+
+ // when
+ CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId);
+ cut.closeByStreamId(streamId);
+
+ // then
+ assertSame("should return proper publisher", cambriaPublisher, cachedPublisher);
+ verify(cambriaPublisher).close(20, TimeUnit.SECONDS);
+ }
+
+ private Matcher<Exception> causeIsInstanceOf(final Class<?> clazz) {
+ return new BaseMatcher<Exception>() {
+ @Override
+ public boolean matches(Object o) {
+ return o instanceof Throwable && clazz.isInstance(((Throwable) o).getCause());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("exception cause should be an instance of " + clazz.getName());
+ }
+ };
+ }
+} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java b/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java
new file mode 100644
index 00000000..81c6556b
--- /dev/null
+++ b/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java
@@ -0,0 +1,90 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.ves
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.commonFunction;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.verify;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import java.io.IOException;
+import org.json.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EventPublisherHashTest {
+ private EventPublisherHash cut;
+
+ @Mock
+ private DmaapPublishers dmaapPublishers;
+ @Mock
+ private CambriaBatchingPublisher cambriaPublisher;
+
+ @Before
+ public void setUp() {
+ given(dmaapPublishers.getByStreamId(anyString())).willReturn(cambriaPublisher);
+
+ cut = new EventPublisherHash(dmaapPublishers);
+ }
+
+ @Test
+ public void sendEventShouldSendEventToATopic() throws Exception {
+ // given
+ JSONObject event = new JSONObject("{}");
+ final String streamId = "sampleStreamId";
+
+ // when
+ cut.sendEvent(event, streamId);
+
+ // then
+ verify(cambriaPublisher).send("MyPartitionKey", event.toString());
+ }
+
+ @Test
+ public void sendEventShouldRemoveUuid() throws Exception {
+ // given
+ JSONObject event = new JSONObject("{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}");
+ final String streamId = "sampleStreamId";
+
+ // when
+ cut.sendEvent(event, streamId);
+
+ // then
+ verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString());
+ }
+
+ @Test
+ public void sendEventShouldCloseConnectionWhenExceptionOccurred() throws Exception {
+ // given
+ JSONObject event = new JSONObject("{}");
+ final String streamId = "sampleStreamId";
+ given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail"));
+
+ // when
+ cut.sendEvent(event, streamId);
+
+ // then
+ verify(dmaapPublishers).closeByStreamId(streamId);
+ }
+} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java b/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java
index 821e7a19..bfdb3d76 100644
--- a/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java
+++ b/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java
@@ -1,15 +1,16 @@
/*-
* ============LICENSE_START=======================================================
- * PROJECT
+ * org.onap.dcaegen2.collectors.ves
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,10 +21,13 @@
package org.onap.dcae.vestest;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.theories.suppliers.TestedOn;
import org.onap.dcae.commonFunction.CommonStartup;
import org.onap.dcae.commonFunction.DmaapPropertyReader;
import org.onap.dcae.commonFunction.EventProcessor;
@@ -36,13 +40,13 @@ public class TestEventProcessor {
EventProcessor ec;
String ev= "{\"event\": {\"commonEventHeader\": { \"reportingEntityName\": \"VM name will be provided by ECOMP\", \"startEpochMicrosec\": 1477012779802988,\"lastEpochMicrosec\": 1477012789802988,\"eventId\": \"83\",\"sourceName\": \"Dummy VM name - No Metadata available\",\"sequence\": 83,\"priority\": \"Normal\",\"functionalRole\": \"vFirewall\",\"domain\": \"measurementsForVfScaling\",\"reportingEntityId\": \"VM UUID will be provided by ECOMP\",\"sourceId\": \"Dummy VM UUID - No Metadata available\",\"version\": 1.1},\"measurementsForVfScalingFields\": {\"measurementInterval\": 10,\"measurementsForVfScalingVersion\": 1.1,\"vNicUsageArray\": [{\"multicastPacketsIn\": 0,\"bytesIn\": 3896,\"unicastPacketsIn\": 0, \"multicastPacketsOut\": 0,\"broadcastPacketsOut\": 0, \"packetsOut\": 28,\"bytesOut\": 12178,\"broadcastPacketsIn\": 0,\"packetsIn\": 58,\"unicastPacketsOut\": 0,\"vNicIdentifier\": \"eth0\"}]}}}";
String testinput;
-
+
@Before
public void setUp() throws Exception {
CommonStartup.streamid="fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling";
CommonStartup.eventTransformFlag = 1;
testinput = "src/test/resources/testDmaapConfig_ip.json";
-
+
}
@After
@@ -52,11 +56,11 @@ public class TestEventProcessor {
@Test
public void testLoad() {
-
+
EventProcessor ec = new EventProcessor();
-
+
ec.event=new org.json.JSONObject(ev);
-
+
ec.overrideEvent();
//event.commonEventHeader.sourceName
Boolean flag = ec.event.getJSONObject("event").getJSONObject("commonEventHeader").has("sourceName");
@@ -69,12 +73,12 @@ public class TestEventProcessor {
DmaapPropertyReader dr = null;
EventPublisherHash eph = null;
-
+
Boolean flag = false;
dr = new DmaapPropertyReader(testinput);
eph = EventPublisherHash.getInstance();
-
-
+
+
if (eph.equals(null))
{
flag = false;
@@ -84,55 +88,45 @@ public class TestEventProcessor {
flag = true;
}
assertEquals(true, flag);
-
-
+
+
}
-
+
@Test
public void testpublisherhashclassload() {
DmaapPropertyReader dr;
EventPublisherHash eph = null;
-
- Boolean flag = false;
+
dr = new DmaapPropertyReader(testinput);
eph = EventPublisherHash.getInstance();
EventProcessor ec = new EventProcessor();
- ec.event=new org.json.JSONObject(ev);
+ ec.event=new org.json.JSONObject(ev);
CommonStartup.cambriaConfigFile="src/test/resources/testDmaapConfig_ip.json";
-
- CambriaBatchingPublisher pub = eph.Dmaaphash("sec_fault_ueb");
-
- if (pub == null || pub.equals(null))
- {
- flag = false;
- }
- else
- {
- flag = true;
- }
- assertEquals(true, flag);
-
- }
+
+ CambriaBatchingPublisher pub = eph.getDmaapPublisher("sec_fault_ueb");
+
+ assertNotNull(pub);
+ }
@Test
public void testpublisherhashSend() {
DmaapPropertyReader dr;
EventPublisherHash eph = null;
-
+
Boolean flag = true;
dr = new DmaapPropertyReader(testinput);
eph = EventPublisherHash.getInstance();
-
+
EventProcessor ec = new EventProcessor();
ec.event=new org.json.JSONObject(ev);
CommonStartup.cambriaConfigFile="src/test/resources/testDmaapConfig_ip.json";
eph.sendEvent(ec.event, "sec_fault_ueb");
-
+
assertEquals(true, flag);
-
+
}
}
diff --git a/src/test/resources/test_collector_ip_op.properties b/src/test/resources/test_collector_ip_op.properties
index ad3ee2f1..a78258ec 100644
--- a/src/test/resources/test_collector_ip_op.properties
+++ b/src/test/resources/test_collector_ip_op.properties
@@ -1,78 +1,78 @@
-###############################################################################
-##
-## Collector Server config
-##
-## - Default values are shown as commented settings.
-##
-###############################################################################
-##
-## HTTP(S) service
-##
-## Normally:
-##
-## - 8080 is http service
-## - https is disabled by default (-1)
-##
-## - At this time, the server always binds to 0.0.0.0
-##
-## The default port when header.authflag is disabled (0)
-collector.service.port=-1
-
-## The secure port is required if header.authflag is set to 1 (true)
-## Authentication is only supported via secure port
-## When enabled - require valid keystore defined
-collector.service.secure.port=8443
-
-## The keystore must be setup per installation when secure port is configured
-collector.keystore.file.location=/opt/app/dcae-certificate/keystore.jks
-collector.keystore.passwordfile=/opt/app/dcae-certificate/.password
-collector.keystore.alias=dynamically generated
-
-
-###############################################################################
-## Processing
-##
-## If there's a problem that prevents the collector from processing alarms,
-## it's normally better to apply back pressure to the caller than to try to
-## buffer beyond a reasonable size limit. With a limit, the server won't crash
-## due to being out of memory, and the caller will get a 5xx reply saying the
-## server is in trouble.
-collector.inputQueue.maxPending=8096
-
-## Schema Validation checkflag
-## default no validation checkflag (-1)
-## If enabled (1) - schemafile location must be specified
-collector.schema.checkflag=1
-collector.schema.file={\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.json\"}
-
-## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile
-collector.dmaap.streamid=fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary
-collector.dmaapfile=./etc/DmaapConfig.json
-
-## Custom ExceptionConfiguration
-exceptionConfig=./etc/ExceptionConfig.json
-
-## authflag control authentication by the collector
-## If enabled (1) - then authlist has to be defined
-## When authflag is enabled, only secure port will be supported
-## To disable enter 0
-header.authflag=1
-## Combination of userid,base64 encoded pwd list to be supported
-## userid and pwd comma separated; pipe delimitation between each pair
-header.authlist=sample1,c2FtcGxlMQ==|userid1,base64encodepwd1|userid2,base64encodepwd2
-
-## Event transformation Flag - when set expects configurable transformation
-## defined under ./etc/eventTransform.json
-## Enabled by default; to disable set to 0
-event.transform.flag=1
-streams_subscribes = {}
-services_calls = {}
-tomcat.maxthreads = 200
-
-###############################################################################
-##
-## Tomcat control
-##
-#tomcat.maxthreads=(tomcat default, which is usually 200)
-
-
+###############################################################################
+##
+## Collector Server config
+##
+## - Default values are shown as commented settings.
+##
+###############################################################################
+##
+## HTTP(S) service
+##
+## Normally:
+##
+## - 8080 is http service
+## - https is disabled by default (-1)
+##
+## - At this time, the server always binds to 0.0.0.0
+##
+## The default port when header.authflag is disabled (0)
+collector.service.port=-1
+
+## The secure port is required if header.authflag is set to 1 (true)
+## Authentication is only supported via secure port
+## When enabled - require valid keystore defined
+collector.service.secure.port=8443
+
+## The keystore must be setup per installation when secure port is configured
+collector.keystore.file.location=/opt/app/dcae-certificate/keystore.jks
+collector.keystore.passwordfile=/opt/app/dcae-certificate/.password
+collector.keystore.alias=dynamically generated
+
+
+###############################################################################
+## Processing
+##
+## If there's a problem that prevents the collector from processing alarms,
+## it's normally better to apply back pressure to the caller than to try to
+## buffer beyond a reasonable size limit. With a limit, the server won't crash
+## due to being out of memory, and the caller will get a 5xx reply saying the
+## server is in trouble.
+collector.inputQueue.maxPending=8096
+
+## Schema Validation checkflag
+## default no validation checkflag (-1)
+## If enabled (1) - schemafile location must be specified
+collector.schema.checkflag=1
+collector.schema.file={\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.json\"}
+
+## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile
+collector.dmaap.streamid=fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary
+collector.dmaapfile=./etc/DmaapConfig.json
+
+## Custom ExceptionConfiguration
+exceptionConfig=./etc/ExceptionConfig.json
+
+## authflag control authentication by the collector
+## If enabled (1) - then authlist has to be defined
+## When authflag is enabled, only secure port will be supported
+## To disable enter 0
+header.authflag=1
+## Combination of userid,base64 encoded pwd list to be supported
+## userid and pwd comma separated; pipe delimitation between each pair
+header.authlist=sample1,c2FtcGxlMQ==|userid1,base64encodepwd1|userid2,base64encodepwd2
+
+## Event transformation Flag - when set expects configurable transformation
+## defined under ./etc/eventTransform.json
+## Enabled by default; to disable set to 0
+event.transform.flag=1
+streams_subscribes = {}
+services_calls = {}
+tomcat.maxthreads = 200
+
+###############################################################################
+##
+## Tomcat control
+##
+#tomcat.maxthreads=(tomcat default, which is usually 200)
+
+