diff options
Diffstat (limited to 'src/test')
4 files changed, 340 insertions, 113 deletions
diff --git a/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java new file mode 100644 index 00000000..782b7c25 --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/DmaapPublishersTest.java @@ -0,0 +1,143 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaPublisher; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.UncheckedExecutionException; +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.concurrent.TimeUnit; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + + +@RunWith(MockitoJUnitRunner.class) +public class DmaapPublishersTest { + + @Mock + private CambriaPublisherFactory publisherFactory; + @Mock + private CambriaBatchingPublisher cambriaPublisher; + private DmaapPublishers cut; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setUp() throws MalformedURLException, GeneralSecurityException { + given(publisherFactory.createCambriaPublisher(anyString())).willReturn(cambriaPublisher); + cut = DmaapPublishers.create(publisherFactory); + } + + @Test + public void getByStreamIdShouldUseCachedItem() throws IOException, GeneralSecurityException { + // given + String streamId = "sampleStream"; + + // when + CambriaBatchingPublisher firstPublisher = cut.getByStreamId(streamId); + CambriaBatchingPublisher secondPublisher = cut.getByStreamId(streamId); + + // then + verify(publisherFactory, times(1)).createCambriaPublisher(streamId); + assertSame("should return same instance", firstPublisher, secondPublisher); + } + + @Test + public void getByStreamIdShouldHandleErrors() throws MalformedURLException, GeneralSecurityException { + // given + MalformedURLException exception = new MalformedURLException(); + given(publisherFactory.createCambriaPublisher(anyString())).willThrow(exception); + expectedException.expect(allOf( + instanceOf(UncheckedExecutionException.class), + causeIsInstanceOf(exception.getClass()))); + + // when + cut.getByStreamId("a stream"); + + // then + // exception should have been thrown + } + + @Test + public void closeByStreamIdShouldCloseConnection() throws IOException, InterruptedException { + // given + String streamId = "sampleStream"; + given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))) + .willReturn(ImmutableList.of(new CambriaPublisher.message("p", "msg"))); + + // when + CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId); + cut.closeByStreamId(streamId); + + // then + assertSame("should return proper publisher", cambriaPublisher, cachedPublisher); + verify(cambriaPublisher).close(20, TimeUnit.SECONDS); + } + + @Test + public void closeByStreamIdShouldHandleErrors() throws IOException, InterruptedException { + // given + String streamId = "sampleStream"; + given(cambriaPublisher.close(anyLong(), any(TimeUnit.class))).willThrow(IOException.class); + + // when + CambriaBatchingPublisher cachedPublisher = cut.getByStreamId(streamId); + cut.closeByStreamId(streamId); + + // then + assertSame("should return proper publisher", cambriaPublisher, cachedPublisher); + verify(cambriaPublisher).close(20, TimeUnit.SECONDS); + } + + private Matcher<Exception> causeIsInstanceOf(final Class<?> clazz) { + return new BaseMatcher<Exception>() { + @Override + public boolean matches(Object o) { + return o instanceof Throwable && clazz.isInstance(((Throwable) o).getCause()); + } + + @Override + public void describeTo(Description description) { + description.appendText("exception cause should be an instance of " + clazz.getName()); + } + }; + } +}
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java b/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java new file mode 100644 index 00000000..81c6556b --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/EventPublisherHashTest.java @@ -0,0 +1,90 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.verify; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class EventPublisherHashTest { + private EventPublisherHash cut; + + @Mock + private DmaapPublishers dmaapPublishers; + @Mock + private CambriaBatchingPublisher cambriaPublisher; + + @Before + public void setUp() { + given(dmaapPublishers.getByStreamId(anyString())).willReturn(cambriaPublisher); + + cut = new EventPublisherHash(dmaapPublishers); + } + + @Test + public void sendEventShouldSendEventToATopic() throws Exception { + // given + JSONObject event = new JSONObject("{}"); + final String streamId = "sampleStreamId"; + + // when + cut.sendEvent(event, streamId); + + // then + verify(cambriaPublisher).send("MyPartitionKey", event.toString()); + } + + @Test + public void sendEventShouldRemoveUuid() throws Exception { + // given + JSONObject event = new JSONObject("{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}"); + final String streamId = "sampleStreamId"; + + // when + cut.sendEvent(event, streamId); + + // then + verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString()); + } + + @Test + public void sendEventShouldCloseConnectionWhenExceptionOccurred() throws Exception { + // given + JSONObject event = new JSONObject("{}"); + final String streamId = "sampleStreamId"; + given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail")); + + // when + cut.sendEvent(event, streamId); + + // then + verify(dmaapPublishers).closeByStreamId(streamId); + } +}
\ No newline at end of file diff --git a/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java b/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java index 821e7a19..bfdb3d76 100644 --- a/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java +++ b/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java @@ -1,15 +1,16 @@ /*-
* ============LICENSE_START=======================================================
- * PROJECT
+ * org.onap.dcaegen2.collectors.ves
* ================================================================================
* Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,10 +21,13 @@ package org.onap.dcae.vestest;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.experimental.theories.suppliers.TestedOn;
import org.onap.dcae.commonFunction.CommonStartup;
import org.onap.dcae.commonFunction.DmaapPropertyReader;
import org.onap.dcae.commonFunction.EventProcessor;
@@ -36,13 +40,13 @@ public class TestEventProcessor { EventProcessor ec;
String ev= "{\"event\": {\"commonEventHeader\": { \"reportingEntityName\": \"VM name will be provided by ECOMP\", \"startEpochMicrosec\": 1477012779802988,\"lastEpochMicrosec\": 1477012789802988,\"eventId\": \"83\",\"sourceName\": \"Dummy VM name - No Metadata available\",\"sequence\": 83,\"priority\": \"Normal\",\"functionalRole\": \"vFirewall\",\"domain\": \"measurementsForVfScaling\",\"reportingEntityId\": \"VM UUID will be provided by ECOMP\",\"sourceId\": \"Dummy VM UUID - No Metadata available\",\"version\": 1.1},\"measurementsForVfScalingFields\": {\"measurementInterval\": 10,\"measurementsForVfScalingVersion\": 1.1,\"vNicUsageArray\": [{\"multicastPacketsIn\": 0,\"bytesIn\": 3896,\"unicastPacketsIn\": 0, \"multicastPacketsOut\": 0,\"broadcastPacketsOut\": 0, \"packetsOut\": 28,\"bytesOut\": 12178,\"broadcastPacketsIn\": 0,\"packetsIn\": 58,\"unicastPacketsOut\": 0,\"vNicIdentifier\": \"eth0\"}]}}}";
String testinput;
-
+
@Before
public void setUp() throws Exception {
CommonStartup.streamid="fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling";
CommonStartup.eventTransformFlag = 1;
testinput = "src/test/resources/testDmaapConfig_ip.json";
-
+
}
@After
@@ -52,11 +56,11 @@ public class TestEventProcessor { @Test
public void testLoad() {
-
+
EventProcessor ec = new EventProcessor();
-
+
ec.event=new org.json.JSONObject(ev);
-
+
ec.overrideEvent();
//event.commonEventHeader.sourceName
Boolean flag = ec.event.getJSONObject("event").getJSONObject("commonEventHeader").has("sourceName");
@@ -69,12 +73,12 @@ public class TestEventProcessor { DmaapPropertyReader dr = null;
EventPublisherHash eph = null;
-
+
Boolean flag = false;
dr = new DmaapPropertyReader(testinput);
eph = EventPublisherHash.getInstance();
-
-
+
+
if (eph.equals(null))
{
flag = false;
@@ -84,55 +88,45 @@ public class TestEventProcessor { flag = true;
}
assertEquals(true, flag);
-
-
+
+
}
-
+
@Test
public void testpublisherhashclassload() {
DmaapPropertyReader dr;
EventPublisherHash eph = null;
-
- Boolean flag = false;
+
dr = new DmaapPropertyReader(testinput);
eph = EventPublisherHash.getInstance();
EventProcessor ec = new EventProcessor();
- ec.event=new org.json.JSONObject(ev);
+ ec.event=new org.json.JSONObject(ev);
CommonStartup.cambriaConfigFile="src/test/resources/testDmaapConfig_ip.json";
-
- CambriaBatchingPublisher pub = eph.Dmaaphash("sec_fault_ueb");
-
- if (pub == null || pub.equals(null))
- {
- flag = false;
- }
- else
- {
- flag = true;
- }
- assertEquals(true, flag);
-
- }
+
+ CambriaBatchingPublisher pub = eph.getDmaapPublisher("sec_fault_ueb");
+
+ assertNotNull(pub);
+ }
@Test
public void testpublisherhashSend() {
DmaapPropertyReader dr;
EventPublisherHash eph = null;
-
+
Boolean flag = true;
dr = new DmaapPropertyReader(testinput);
eph = EventPublisherHash.getInstance();
-
+
EventProcessor ec = new EventProcessor();
ec.event=new org.json.JSONObject(ev);
CommonStartup.cambriaConfigFile="src/test/resources/testDmaapConfig_ip.json";
eph.sendEvent(ec.event, "sec_fault_ueb");
-
+
assertEquals(true, flag);
-
+
}
}
diff --git a/src/test/resources/test_collector_ip_op.properties b/src/test/resources/test_collector_ip_op.properties index ad3ee2f1..a78258ec 100644 --- a/src/test/resources/test_collector_ip_op.properties +++ b/src/test/resources/test_collector_ip_op.properties @@ -1,78 +1,78 @@ -###############################################################################
-##
-## Collector Server config
-##
-## - Default values are shown as commented settings.
-##
-###############################################################################
-##
-## HTTP(S) service
-##
-## Normally:
-##
-## - 8080 is http service
-## - https is disabled by default (-1)
-##
-## - At this time, the server always binds to 0.0.0.0
-##
-## The default port when header.authflag is disabled (0)
-collector.service.port=-1
-
-## The secure port is required if header.authflag is set to 1 (true)
-## Authentication is only supported via secure port
-## When enabled - require valid keystore defined
-collector.service.secure.port=8443
-
-## The keystore must be setup per installation when secure port is configured
-collector.keystore.file.location=/opt/app/dcae-certificate/keystore.jks
-collector.keystore.passwordfile=/opt/app/dcae-certificate/.password
-collector.keystore.alias=dynamically generated
-
-
-###############################################################################
-## Processing
-##
-## If there's a problem that prevents the collector from processing alarms,
-## it's normally better to apply back pressure to the caller than to try to
-## buffer beyond a reasonable size limit. With a limit, the server won't crash
-## due to being out of memory, and the caller will get a 5xx reply saying the
-## server is in trouble.
-collector.inputQueue.maxPending=8096
-
-## Schema Validation checkflag
-## default no validation checkflag (-1)
-## If enabled (1) - schemafile location must be specified
-collector.schema.checkflag=1
-collector.schema.file={\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.json\"}
-
-## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile
-collector.dmaap.streamid=fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary
-collector.dmaapfile=./etc/DmaapConfig.json
-
-## Custom ExceptionConfiguration
-exceptionConfig=./etc/ExceptionConfig.json
-
-## authflag control authentication by the collector
-## If enabled (1) - then authlist has to be defined
-## When authflag is enabled, only secure port will be supported
-## To disable enter 0
-header.authflag=1
-## Combination of userid,base64 encoded pwd list to be supported
-## userid and pwd comma separated; pipe delimitation between each pair
-header.authlist=sample1,c2FtcGxlMQ==|userid1,base64encodepwd1|userid2,base64encodepwd2
-
-## Event transformation Flag - when set expects configurable transformation
-## defined under ./etc/eventTransform.json
-## Enabled by default; to disable set to 0
-event.transform.flag=1
-streams_subscribes = {}
-services_calls = {}
-tomcat.maxthreads = 200
-
-###############################################################################
-##
-## Tomcat control
-##
-#tomcat.maxthreads=(tomcat default, which is usually 200)
-
-
+############################################################################### +## +## Collector Server config +## +## - Default values are shown as commented settings. +## +############################################################################### +## +## HTTP(S) service +## +## Normally: +## +## - 8080 is http service +## - https is disabled by default (-1) +## +## - At this time, the server always binds to 0.0.0.0 +## +## The default port when header.authflag is disabled (0) +collector.service.port=-1 + +## The secure port is required if header.authflag is set to 1 (true) +## Authentication is only supported via secure port +## When enabled - require valid keystore defined +collector.service.secure.port=8443 + +## The keystore must be setup per installation when secure port is configured +collector.keystore.file.location=/opt/app/dcae-certificate/keystore.jks +collector.keystore.passwordfile=/opt/app/dcae-certificate/.password +collector.keystore.alias=dynamically generated + + +############################################################################### +## Processing +## +## If there's a problem that prevents the collector from processing alarms, +## it's normally better to apply back pressure to the caller than to try to +## buffer beyond a reasonable size limit. With a limit, the server won't crash +## due to being out of memory, and the caller will get a 5xx reply saying the +## server is in trouble. +collector.inputQueue.maxPending=8096 + +## Schema Validation checkflag +## default no validation checkflag (-1) +## If enabled (1) - schemafile location must be specified +collector.schema.checkflag=1 +collector.schema.file={\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.json\"} + +## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile +collector.dmaap.streamid=fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary +collector.dmaapfile=./etc/DmaapConfig.json + +## Custom ExceptionConfiguration +exceptionConfig=./etc/ExceptionConfig.json + +## authflag control authentication by the collector +## If enabled (1) - then authlist has to be defined +## When authflag is enabled, only secure port will be supported +## To disable enter 0 +header.authflag=1 +## Combination of userid,base64 encoded pwd list to be supported +## userid and pwd comma separated; pipe delimitation between each pair +header.authlist=sample1,c2FtcGxlMQ==|userid1,base64encodepwd1|userid2,base64encodepwd2 + +## Event transformation Flag - when set expects configurable transformation +## defined under ./etc/eventTransform.json +## Enabled by default; to disable set to 0 +event.transform.flag=1 +streams_subscribes = {} +services_calls = {} +tomcat.maxthreads = 200 + +############################################################################### +## +## Tomcat control +## +#tomcat.maxthreads=(tomcat default, which is usually 200) + + |