diff options
8 files changed, 577 insertions, 262 deletions
@@ -1,6 +1,7 @@ <!--
================================================================================
Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+Copyright (c) 2018 Nokia. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -137,6 +138,13 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property. <version>1.10</version>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.18.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
diff --git a/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java b/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java new file mode 100644 index 00000000..35d3e317 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/CambriaPublisherFactory.java @@ -0,0 +1,69 @@ +package org.onap.dcae.commonFunction; +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CambriaPublisherFactory { + + private static Logger log = LoggerFactory.getLogger(CambriaPublisherFactory.class); + + public CambriaBatchingPublisher createCambriaPublisher(String streamId) + throws MalformedURLException, GeneralSecurityException { + String authpwd = null; + String ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash + .get(streamId + ".cambria.url"); + + if (ueburl == null) { + ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash + .get(streamId + ".cambria.hosts"); + } + String topic = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile) + .getKeyValue(streamId + ".cambria.topic"); + String authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile) + .getKeyValue(streamId + ".basicAuthUsername"); + + if (authuser != null) { + authpwd = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash + .get(streamId + ".basicAuthPassword"); + } + + if ((authuser != null) && (authpwd != null)) { + log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic, authuser, authpwd)); + return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic).usingHttps() + .authenticatedByHttp(authuser, authpwd).logSendFailuresAfter(5) + // .logTo(log) + // .limitBatch(100, 10) + .build(); + } else { + log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic)); + return new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic) + // .logTo(log) + .logSendFailuresAfter(5) + // .limitBatch(100, 10) + .build(); + } + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java b/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java new file mode 100644 index 00000000..c86a3742 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/DmaapPublishers.java @@ -0,0 +1,96 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class DmaapPublishers { + + private static Logger log = LoggerFactory.getLogger(DmaapPublishers.class); + private final LoadingCache<String, CambriaBatchingPublisher> publishers; + + private DmaapPublishers( + LoadingCache<String, CambriaBatchingPublisher> publishers) { + this.publishers = publishers; + } + + static DmaapPublishers create() { + return create(new CambriaPublisherFactory()); + } + + static DmaapPublishers create(final CambriaPublisherFactory publisherFactory) { + final LoadingCache<String, CambriaBatchingPublisher> cache = CacheBuilder.<String, CambriaBatchingPublisher>newBuilder() +// .expireAfterAccess(10, TimeUnit.MINUTES) + .removalListener(new RemovalListener<String, CambriaBatchingPublisher>() { + @Override + public void onRemoval(RemovalNotification<String, CambriaBatchingPublisher> notification) { + if (notification.getValue() != null) { + onCacheItemInvalidated(notification.getValue()); + } + } + }) + .build(new CacheLoader<String, CambriaBatchingPublisher>() { + @Override + public CambriaBatchingPublisher load(String streamId) + throws MalformedURLException, GeneralSecurityException { + try { + return publisherFactory.createCambriaPublisher(streamId); + } catch (MalformedURLException | GeneralSecurityException e) { + log.error("CambriaClientBuilders connection reader exception : streamID - " + streamId + " " + + e.getMessage()); + throw e; + } + } + }); + return new DmaapPublishers(cache); + } + + public CambriaBatchingPublisher getByStreamId(String streamId) { + return publishers.getUnchecked(streamId); + } + + public void closeByStreamId(String streamId) { + publishers.invalidate(streamId); + } + + private static void onCacheItemInvalidated(CambriaBatchingPublisher pub) { + try { + final List<?> stuck = pub.close(20, TimeUnit.SECONDS); + if (!stuck.isEmpty()) { + log.error(stuck.size() + " messages unsent"); + } + } catch (InterruptedException | IOException e) { + log.error("Caught Exception on Close event: {}", e); + } + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java index 49221418..474424a7 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java @@ -1,15 +1,16 @@ /*- * ============LICENSE_START======================================================= - * PROJECT + * org.onap.dcaegen2.collectors.ves * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,159 +22,73 @@ package org.onap.dcae.commonFunction; import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; -import com.google.gson.JsonArray; - +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; - public class EventPublisherHash { - private static final String VES_UNIQUE_ID = "VESuniqueId"; - private static EventPublisherHash instance; - private CambriaBatchingPublisher pub; - - private String streamid = ""; - private String ueburl = ""; - private String topic = ""; - private String authuser = ""; - private String authpwd = ""; - - private static Logger log = LoggerFactory.getLogger(EventPublisherHash.class); - - protected static HashMap<String, CambriaBatchingPublisher> map = new HashMap<String, CambriaBatchingPublisher>(); - - - - public CambriaBatchingPublisher Dmaaphash(String newstreamid) { - pub = null; - streamid = newstreamid; - if (map != null && map.containsKey(streamid)){ - pub = map.get(streamid); - - } - else - { - - try { - ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash - .get(streamid + ".cambria.url"); - - if (ueburl == null) { - ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash - .get(streamid + ".cambria.hosts"); - } - topic = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile) - .getKeyValue(streamid + ".cambria.topic"); - authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile) - .getKeyValue(streamid + ".basicAuthUsername"); - - if (authuser != null) { - authpwd = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash - .get(streamid + ".basicAuthPassword"); - } - - - if ((authuser != null) && (authpwd != null)) { - log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic, authuser, authpwd)); - pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic).usingHttps() - .authenticatedByHttp(authuser, authpwd).logSendFailuresAfter(5) - // .logTo(log) - // .limitBatch(100, 10) - .build(); - } else { - - log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic)); - pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic) - // .logTo(log) - .logSendFailuresAfter(5) - // .limitBatch(100, 10) - .build(); - - } - - map.put(streamid, pub); - - - } catch ( Exception e) { - log.error("CambriaClientBuilders connection reader exception : streamID - " + newstreamid + " " + e.getMessage()); - } - - } - return pub; - - } - - /** - * Returns event publisher - * @return event publisher - */ - public static synchronized EventPublisherHash getInstance() { - if (instance == null) { - instance = new EventPublisherHash(); - } - return instance; - - } - - - public synchronized void sendEvent(JSONObject event, String streamid) { - - log.debug("EventPublisher.sendEvent: instance for publish is ready"); - - if (event.has(VES_UNIQUE_ID)) { - String uuid = event.get(VES_UNIQUE_ID).toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - log.debug("Removing VESuniqueid object from event"); - event.remove(VES_UNIQUE_ID); - } - - try { - - - int pendingMsgs = Dmaaphash(streamid).send("MyPartitionKey", event.toString()); - // this.wait(2000); - - if (pendingMsgs > 100) { - log.info("Pending Message Count=" + pendingMsgs); - } - - log.info("pub.send invoked - no error"); - //CommonStartup.oplog.info(String.format("URL:%sTOPIC:%sEvent Published:%s", ueburl, topic, event)); - CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event)); - } catch (IOException | IllegalArgumentException e) { - log.error("Unable to publish event: {} streamid: {}. Exception: {}", event, streamid, e); - Dmaaphash(streamid).close(); - map.remove(streamid); - } - - } - - public synchronized void closePublisher() { - - try { - if (pub != null) { - - final List<?> stuck = pub.close(20, TimeUnit.SECONDS); - if (!stuck.isEmpty()) { - log.error(stuck.size() + " messages unsent"); - } - } - } catch (InterruptedException | IOException e) { - log.error("Caught Exception on Close event: {}", e); - } - - } + private static final String VES_UNIQUE_ID = "VESuniqueId"; + private static final Logger log = LoggerFactory.getLogger(EventPublisherHash.class); + private static volatile EventPublisherHash instance = new EventPublisherHash(DmaapPublishers.create()); + private final DmaapPublishers dmaapPublishers; + + /** + * Returns event publisher + * + * @return event publisher + */ + public static EventPublisherHash getInstance() { + return instance; + } + + @VisibleForTesting + EventPublisherHash(DmaapPublishers dmaapPublishers) { + this.dmaapPublishers = dmaapPublishers; + } + + public void sendEvent(JSONObject event, String streamid) { + log.debug("EventPublisher.sendEvent: instance for publish is ready"); + clearVesUniqueId(event); + + try { + sendEventUsingCachedPublisher(streamid, event); + } catch (IOException | IllegalArgumentException e) { + log.error("Unable to publish event: {} streamid: {}. Exception: {}", event, streamid, e); + dmaapPublishers.closeByStreamId(streamid); + } + } + + private void clearVesUniqueId(JSONObject event) { + if (event.has(VES_UNIQUE_ID)) { + String uuid = event.get(VES_UNIQUE_ID).toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + log.debug("Removing VESuniqueid object from event"); + event.remove(VES_UNIQUE_ID); + } + } + + private void sendEventUsingCachedPublisher(String streamid, JSONObject event) throws IOException { + int pendingMsgs = dmaapPublishers.getByStreamId(streamid).send("MyPartitionKey", event.toString()); + // this.wait(2000); + + if (pendingMsgs > 100) { + log.info("Pending Message Count=" + pendingMsgs); + } + + log.info("pub.send invoked - no error"); + //CommonStartup.oplog.info(String.format("URL:%sTOPIC:%sEvent Published:%s", ueburl, topic, event)); + CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event)); + } + + @VisibleForTesting + public CambriaBatchingPublisher getDmaapPublisher(String streamId) { + return dmaapPublishers.getByStreamId(streamId); + } } - diff --git a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java new file mode 100644 index 00000000..782b7c25 --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java @@ -0,0 +1,143 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaPublisher; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.concurrent.TimeUnit; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + + +@RunWith(MockitoJUnitRunner.class) +public class DmaapPublishersTest { + + @Mock + private CambriaPublisherFactory publisherFactory; + @Mock + private CambriaBatchingPublisher cambriaPublisher; + private DmaapPublishers cut; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() throws MalformedURLException, GeneralSecurityException { + given(publisherFactory.createCambriaPublisher(anyString())).willReturn(cambriaPublisher); + cut = DmaapPublishers.create(publisherFactory); + } + + @Test + public void getByStreamIdShouldUseCachedItem() throws IOException, GeneralSecurityException { + // given + String streamId = "sampleStream"; + + // when + CambriaBatchingPublisher firstPublisher = cut.getByStreamId(streamId); + CambriaBatchingPublisher secondPublisher = cut.getByStreamId(streamId); + + // then + verify(publisherFactory, times(1)).createCambriaPublisher(streamId); + assertSame("should return same instance", firstPublisher, secondPublisher); + } + + @Test + public void getByStreamIdShouldHandleErrors() throws MalformedURLException, GeneralSecurityException { + // given + MalformedURLException exception = new MalformedURLException(); + given(publisherFactory.createCambriaPublisher(anyString())).willThrow(exception); + expectedException.expect(allOf( + instanceOf(UncheckedExecutionException.class), + causeIsInstanceOf(exception.getClass()))); + + // when + cut.getByStreamId("a stream"); + + // then + // exception should have been thrown + } + + @Test + public void closeByStreamIdShouldCloseConnection() throws IOException, InterruptedException { + // given + String streamId = "sampleStream"; + given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))) + .willReturn(ImmutableList.of(new CambriaPublisher.message("p", "msg"))); + + // when + CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId); + cut.closeByStreamId(streamId); + + // then + assertSame("should return proper publisher", cambriaPublisher, cachedPublisher); + verify(cambriaPublisher).close(20, TimeUnit.SECONDS); + } + + @Test + public void closeByStreamIdShouldHandleErrors() throws IOException, InterruptedException { + // given + String streamId = "sampleStream"; + given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))).willThrow(IOException.class); + + // when + CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId); + cut.closeByStreamId(streamId); + + // then + assertSame("should return proper publisher", cambriaPublisher, cachedPublisher); + verify(cambriaPublisher).close(20, TimeUnit.SECONDS); + } + + private Matcher<Exception> causeIsInstanceOf(final Class<?> clazz) { + return new BaseMatcher<Exception>() { + @Override + public boolean matches(Object o) { + return o instanceof Throwable && clazz.isInstance(((Throwable) o).getCause()); + } + + @Override + public void describeTo(Description description) { + description.appendText("exception cause should be an instance of " + clazz.getName()); + } + }; + } +}
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java b/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java new file mode 100644 index 00000000..81c6556b --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java @@ -0,0 +1,90 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.verify; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class EventPublisherHashTest { + private EventPublisherHash cut; + + @Mock + private DmaapPublishers dmaapPublishers; + @Mock + private CambriaBatchingPublisher cambriaPublisher; + + @Before + public void setUp() { + given(dmaapPublishers.getByStreamId(anyString())).willReturn(cambriaPublisher); + + cut = new EventPublisherHash(dmaapPublishers); + } + + @Test + public void sendEventShouldSendEventToATopic() throws Exception { + // given + JSONObject event = new JSONObject("{}"); + final String streamId = "sampleStreamId"; + + // when + cut.sendEvent(event, streamId); + + // then + verify(cambriaPublisher).send("MyPartitionKey", event.toString()); + } + + @Test + public void sendEventShouldRemoveUuid() throws Exception { + // given + JSONObject event = new JSONObject("{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}"); + final String streamId = "sampleStreamId"; + + // when + cut.sendEvent(event, streamId); + + // then + verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString()); + } + + @Test + public void sendEventShouldCloseConnectionWhenExceptionOccurred() throws Exception { + // given + JSONObject event = new JSONObject("{}"); + final String streamId = "sampleStreamId"; + given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail")); + + // when + cut.sendEvent(event, streamId); + + // then + verify(dmaapPublishers).closeByStreamId(streamId); + } +}
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java b/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java index 821e7a19..bfdb3d76 100644 --- a/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java +++ b/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java @@ -1,15 +1,16 @@ /*-
* ============LICENSE_START=======================================================
- * PROJECT
+ * org.onap.dcaegen2.collectors.ves
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,10 +21,13 @@ package org.onap.dcae.vestest;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.theories.suppliers.TestedOn;
import org.onap.dcae.commonFunction.CommonStartup;
import org.onap.dcae.commonFunction.DmaapPropertyReader;
import org.onap.dcae.commonFunction.EventProcessor;
@@ -36,13 +40,13 @@ public class TestEventProcessor { EventProcessor ec;
String ev= "{\"event\": {\"commonEventHeader\": { \"reportingEntityName\": \"VM name will be provided by ECOMP\", \"startEpochMicrosec\": 1477012779802988,\"lastEpochMicrosec\": 1477012789802988,\"eventId\": \"83\",\"sourceName\": \"Dummy VM name - No Metadata available\",\"sequence\": 83,\"priority\": \"Normal\",\"functionalRole\": \"vFirewall\",\"domain\": \"measurementsForVfScaling\",\"reportingEntityId\": \"VM UUID will be provided by ECOMP\",\"sourceId\": \"Dummy VM UUID - No Metadata available\",\"version\": 1.1},\"measurementsForVfScalingFields\": {\"measurementInterval\": 10,\"measurementsForVfScalingVersion\": 1.1,\"vNicUsageArray\": [{\"multicastPacketsIn\": 0,\"bytesIn\": 3896,\"unicastPacketsIn\": 0, \"multicastPacketsOut\": 0,\"broadcastPacketsOut\": 0, \"packetsOut\": 28,\"bytesOut\": 12178,\"broadcastPacketsIn\": 0,\"packetsIn\": 58,\"unicastPacketsOut\": 0,\"vNicIdentifier\": \"eth0\"}]}}}";
String testinput;
-
+
@Before
public void setUp() throws Exception {
CommonStartup.streamid="fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling";
CommonStartup.eventTransformFlag = 1;
testinput = "src/test/resources/testDmaapConfig_ip.json";
-
+
}
@After
@@ -52,11 +56,11 @@ public class TestEventProcessor { @Test
public void testLoad() {
-
+
EventProcessor ec = new EventProcessor();
-
+
ec.event=new org.json.JSONObject(ev);
-
+
ec.overrideEvent();
//event.commonEventHeader.sourceName
Boolean flag = ec.event.getJSONObject("event").getJSONObject("commonEventHeader").has("sourceName");
@@ -69,12 +73,12 @@ public class TestEventProcessor { DmaapPropertyReader dr = null;
EventPublisherHash eph = null;
-
+
Boolean flag = false;
dr = new DmaapPropertyReader(testinput);
eph = EventPublisherHash.getInstance();
-
-
+
+
if (eph.equals(null))
{
flag = false;
@@ -84,55 +88,45 @@ public class TestEventProcessor { flag = true;
}
assertEquals(true, flag);
-
-
+
+
}
-
+
@Test
public void testpublisherhashclassload() {
DmaapPropertyReader dr;
EventPublisherHash eph = null;
-
- Boolean flag = false;
+
dr = new DmaapPropertyReader(testinput);
eph = EventPublisherHash.getInstance();
EventProcessor ec = new EventProcessor();
- ec.event=new org.json.JSONObject(ev);
+ ec.event=new org.json.JSONObject(ev);
CommonStartup.cambriaConfigFile="src/test/resources/testDmaapConfig_ip.json";
-
- CambriaBatchingPublisher pub = eph.Dmaaphash("sec_fault_ueb");
-
- if (pub == null || pub.equals(null))
- {
- flag = false;
- }
- else
- {
- flag = true;
- }
- assertEquals(true, flag);
-
- }
+
+ CambriaBatchingPublisher pub = eph.getDmaapPublisher("sec_fault_ueb");
+
+ assertNotNull(pub);
+ }
@Test
public void testpublisherhashSend() {
DmaapPropertyReader dr;
EventPublisherHash eph = null;
-
+
Boolean flag = true;
dr = new DmaapPropertyReader(testinput);
eph = EventPublisherHash.getInstance();
-
+
EventProcessor ec = new EventProcessor();
ec.event=new org.json.JSONObject(ev);
CommonStartup.cambriaConfigFile="src/test/resources/testDmaapConfig_ip.json";
eph.sendEvent(ec.event, "sec_fault_ueb");
-
+
assertEquals(true, flag);
-
+
}
}
diff --git a/src/test/resources/test_collector_ip_op.properties b/src/test/resources/test_collector_ip_op.properties index ad3ee2f1..a78258ec 100644 --- a/src/test/resources/test_collector_ip_op.properties +++ b/src/test/resources/test_collector_ip_op.properties @@ -1,78 +1,78 @@ -###############################################################################
-##
-## Collector Server config
-##
-## - Default values are shown as commented settings.
-##
-###############################################################################
-##
-## HTTP(S) service
-##
-## Normally:
-##
-## - 8080 is http service
-## - https is disabled by default (-1)
-##
-## - At this time, the server always binds to 0.0.0.0
-##
-## The default port when header.authflag is disabled (0)
-collector.service.port=-1
-
-## The secure port is required if header.authflag is set to 1 (true)
-## Authentication is only supported via secure port
-## When enabled - require valid keystore defined
-collector.service.secure.port=8443
-
-## The keystore must be setup per installation when secure port is configured
-collector.keystore.file.location=/opt/app/dcae-certificate/keystore.jks
-collector.keystore.passwordfile=/opt/app/dcae-certificate/.password
-collector.keystore.alias=dynamically generated
-
-
-###############################################################################
-## Processing
-##
-## If there's a problem that prevents the collector from processing alarms,
-## it's normally better to apply back pressure to the caller than to try to
-## buffer beyond a reasonable size limit. With a limit, the server won't crash
-## due to being out of memory, and the caller will get a 5xx reply saying the
-## server is in trouble.
-collector.inputQueue.maxPending=8096
-
-## Schema Validation checkflag
-## default no validation checkflag (-1)
-## If enabled (1) - schemafile location must be specified
-collector.schema.checkflag=1
-collector.schema.file={\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.json\"}
-
-## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile
-collector.dmaap.streamid=fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary
-collector.dmaapfile=./etc/DmaapConfig.json
-
-## Custom ExceptionConfiguration
-exceptionConfig=./etc/ExceptionConfig.json
-
-## authflag control authentication by the collector
-## If enabled (1) - then authlist has to be defined
-## When authflag is enabled, only secure port will be supported
-## To disable enter 0
-header.authflag=1
-## Combination of userid,base64 encoded pwd list to be supported
-## userid and pwd comma separated; pipe delimitation between each pair
-header.authlist=sample1,c2FtcGxlMQ==|userid1,base64encodepwd1|userid2,base64encodepwd2
-
-## Event transformation Flag - when set expects configurable transformation
-## defined under ./etc/eventTransform.json
-## Enabled by default; to disable set to 0
-event.transform.flag=1
-streams_subscribes = {}
-services_calls = {}
-tomcat.maxthreads = 200
-
-###############################################################################
-##
-## Tomcat control
-##
-#tomcat.maxthreads=(tomcat default, which is usually 200)
-
-
+############################################################################### +## +## Collector Server config +## +## - Default values are shown as commented settings. +## +############################################################################### +## +## HTTP(S) service +## +## Normally: +## +## - 8080 is http service +## - https is disabled by default (-1) +## +## - At this time, the server always binds to 0.0.0.0 +## +## The default port when header.authflag is disabled (0) +collector.service.port=-1 + +## The secure port is required if header.authflag is set to 1 (true) +## Authentication is only supported via secure port +## When enabled - require valid keystore defined +collector.service.secure.port=8443 + +## The keystore must be setup per installation when secure port is configured +collector.keystore.file.location=/opt/app/dcae-certificate/keystore.jks +collector.keystore.passwordfile=/opt/app/dcae-certificate/.password +collector.keystore.alias=dynamically generated + + +############################################################################### +## Processing +## +## If there's a problem that prevents the collector from processing alarms, +## it's normally better to apply back pressure to the caller than to try to +## buffer beyond a reasonable size limit. With a limit, the server won't crash +## due to being out of memory, and the caller will get a 5xx reply saying the +## server is in trouble. +collector.inputQueue.maxPending=8096 + +## Schema Validation checkflag +## default no validation checkflag (-1) +## If enabled (1) - schemafile location must be specified +collector.schema.checkflag=1 +collector.schema.file={\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.json\"} + +## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile +collector.dmaap.streamid=fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary +collector.dmaapfile=./etc/DmaapConfig.json + +## Custom ExceptionConfiguration +exceptionConfig=./etc/ExceptionConfig.json + +## authflag control authentication by the collector +## If enabled (1) - then authlist has to be defined +## When authflag is enabled, only secure port will be supported +## To disable enter 0 +header.authflag=1 +## Combination of userid,base64 encoded pwd list to be supported +## userid and pwd comma separated; pipe delimitation between each pair +header.authlist=sample1,c2FtcGxlMQ==|userid1,base64encodepwd1|userid2,base64encodepwd2 + +## Event transformation Flag - when set expects configurable transformation +## defined under ./etc/eventTransform.json +## Enabled by default; to disable set to 0 +event.transform.flag=1 +streams_subscribes = {} +services_calls = {} +tomcat.maxthreads = 200 + +############################################################################### +## +## Tomcat control +## +#tomcat.maxthreads=(tomcat default, which is usually 200) + + |