From 15a1bd474bdc463d744f6621e3c49761f6bf2927 Mon Sep 17 00:00:00 2001 From: Krysiak Adam Gabriel Date: Wed, 23 May 2018 15:34:37 +0200 Subject: Refactored event processor + sonar Issue-ID: DCAEGEN2-521 Change-Id: I9290f21701945cd1bb5e7a43a671991417f25491 Signed-off-by: Krysiak Adam Gabriel --- pom.xml | 6 + .../java/org/onap/dcae/commonFunction/Event.java | 34 +++ .../onap/dcae/commonFunction/EventProcessor.java | 298 +++++++++++---------- .../org/onap/dcae/commonFunction/Processor.java | 33 +++ .../commonFunction/ConfigProcessorAdapterTest.java | 66 +++++ .../dcae/commonFunction/EventProcessorTest.java | 117 ++++++++ .../org/onap/dcae/vestest/TestEventProcessor.java | 132 --------- 7 files changed, 407 insertions(+), 279 deletions(-) create mode 100644 src/main/java/org/onap/dcae/commonFunction/Event.java create mode 100644 src/main/java/org/onap/dcae/commonFunction/Processor.java create mode 100644 src/test/java/org/onap/dcae/commonFunction/ConfigProcessorAdapterTest.java create mode 100644 src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java delete mode 100644 src/test/java/org/onap/dcae/vestest/TestEventProcessor.java diff --git a/pom.xml b/pom.xml index 53a4a7cf..9c559c68 100644 --- a/pom.xml +++ b/pom.xml @@ -341,6 +341,12 @@ limitations under the License. 2.18.0 test + + org.assertj + assertj-core + 3.8.0 + test + diff --git a/src/main/java/org/onap/dcae/commonFunction/Event.java b/src/main/java/org/onap/dcae/commonFunction/Event.java new file mode 100644 index 00000000..faae2451 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/Event.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction; + +import com.google.gson.JsonObject; + +import java.util.List; + +class Event { + final JsonObject filter; + final List processors; + + Event(JsonObject filter, List processors) { + this.filter = filter; + this.processors = processors; + } +} diff --git a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java index c7c052f9..04687b32 100644 --- a/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java +++ b/src/main/java/org/onap/dcae/commonFunction/EventProcessor.java @@ -23,165 +23,169 @@ package org.onap.dcae.commonFunction; import com.att.nsa.clock.SaClock; import com.att.nsa.logging.LoggingContext; import com.att.nsa.logging.log4j.EcompFields; -import com.google.gson.JsonArray; -import com.google.gson.JsonParser; -import org.json.JSONArray; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.FileReader; -import java.lang.reflect.Constructor; +import java.io.IOException; import java.lang.reflect.Method; +import java.lang.reflect.Type; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.TimeZone; + public class EventProcessor implements Runnable { - private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); - private static final String EVENT_LITERAL = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - - private static HashMap streamidHash = new HashMap<>(); - public JSONObject event; - - public EventProcessor() { - log.debug("EventProcessor: Default Constructor"); - - String[] list = CommonStartup.streamid.split("\\|"); - for (String aList : list) { - String domain = aList.split("=")[0]; - - String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(","); - - log.debug(String.format("Domain: %s streamIdList:%s", domain, Arrays.toString(streamIdList))); - streamidHash.put(domain, streamIdList); - } - - } - - @Override - public void run() { - - try { - - event = CommonStartup.fProcessingInputQueue.take(); - - while (event != null) { - // As long as the producer is running we remove elements from - // the queue. - log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size()+ "\tEventProcessor\tRemoving element: " + event ); - - String uuid = event.get("VESuniqueId").toString(); - LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); - localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); - - log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" - + event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain")); - String[] streamIdList = streamidHash - .get(event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain")); - log.debug("streamIdList:" + streamIdList); - - if (streamIdList.length == 0) { - log.error("No StreamID defined for publish - Message dropped" + event); - } else { - for (String aStreamIdList : streamIdList) { - log.info("Invoking publisher for streamId:" + aStreamIdList); - this.overrideEvent(); - - EventPublisherHash.getInstance().sendEvent(event, aStreamIdList); - - } - } - log.debug("Message published" + event); - event = CommonStartup.fProcessingInputQueue.take(); - - } - } catch (InterruptedException e) { - log.error("EventProcessor InterruptedException" + e.getMessage()); - Thread.currentThread().interrupt(); - } - - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void overrideEvent() { - // Set collector timestamp in event payload before publish - final Date currentTime = new Date(); - final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - - JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)); - JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); - commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); - event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); - - if (CommonStartup.eventTransformFlag == 1) { - // read the mapping json file - final JsonParser parser = new JsonParser(); - FileReader fr = null; - try { - fr = new FileReader("./etc/eventTransform.json"); - final JsonArray jo = (JsonArray) parser.parse(fr); - log.info("parse eventTransform.json"); - // now convert to org.json - final String jsonText = jo.toString(); - final JSONArray topLevel = new JSONArray(jsonText); - - Class[] paramJSONObject = new Class[1]; - paramJSONObject[0] = JSONObject.class; - // load VESProcessors class at runtime - Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors"); - Constructor constr = cls.getConstructor(paramJSONObject); - Object obj = constr.newInstance(event); - - for (int j = 0; j < topLevel.length(); j++) { - JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter"); - Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject); - boolean filterMet = (boolean) method.invoke(obj, filterObj); - if (filterMet) { - final JSONArray processors = topLevel.getJSONObject(j).getJSONArray("processors"); - - // call the processor method - for (int i = 0; i < processors.length(); i++) { - final JSONObject processorList = processors.getJSONObject(i); - final String functionName = processorList.getString("functionName"); - final JSONObject args = processorList.getJSONObject("args"); - - - log.info(String.format("functionName==%s | args==%s", functionName, args)); - // reflect method call - method = cls.getDeclaredMethod(functionName, paramJSONObject); - method.invoke(obj, args); - } - } - } - - } catch (Exception e) { - - log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); - } finally { - // close the file - if (fr != null) { - try { - fr.close(); - } catch (IOException e) { - log.error("Error closing file reader stream : " + e.toString()); - } - - } - } - } - // Remove VESversion from event. This field is for internal use and must - // be removed after use. - if (event.has("VESversion")) - event.remove("VESversion"); - - log.debug("Modified event:" + event); - - } + private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); + private static final String EVENT_LITERAL = "event"; + private static final String COMMON_EVENT_HEADER = "commonEventHeader"; + static final Type EVENT_LIST_TYPE = new TypeToken>() { + }.getType(); + private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); + + private static HashMap streamidHash = new HashMap<>(); + JSONObject event; + + public EventProcessor() { + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); + String[] list = CommonStartup.streamid.split("\\|"); + for (String aList : list) { + String domain = aList.split("=")[0]; + + String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(","); + + log.debug(String.format("Domain: %s streamIdList:%s", domain, Arrays.toString(streamIdList))); + streamidHash.put(domain, streamIdList); + } + + } + + @Override + public void run() { + + try { + + event = CommonStartup.fProcessingInputQueue.take(); + + while (event != null) { + // As long as the producer is running we remove elements from + // the queue. + log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event); + + String uuid = event.get("VESuniqueId").toString(); + LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid); + localLC.put(EcompFields.kBeginTimestampMs, SaClock.now()); + + String domain = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain"); + log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + domain); + String[] streamIdList = streamidHash.get(domain); + log.debug("streamIdList:" + Arrays.toString(streamIdList)); + + if (streamIdList.length == 0) { + log.error("No StreamID defined for publish - Message dropped" + event); + } else { + sendEventsToStreams(streamIdList); + } + log.debug("Message published" + event); + event = CommonStartup.fProcessingInputQueue.take(); + + } + } catch (InterruptedException e) { + log.error("EventProcessor InterruptedException" + e.getMessage()); + Thread.currentThread().interrupt(); + } + + } + + public void overrideEvent() { + // Set collector timestamp in event payload before publish + addCurrentTimeToEvent(event); + + if (CommonStartup.eventTransformFlag == 1) { + // read the mapping json file + try (FileReader fr = new FileReader("./etc/eventTransform.json")) { + log.info("parse eventTransform.json"); + List events = new Gson().fromJson(fr, EVENT_LIST_TYPE); + parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(event))); + } catch (IOException e) { + log.error("Couldn't find file ./etc/eventTransform.json" + e.toString()); + } + } + // Remove VESversion from event. This field is for internal use and must + // be removed after use. + if (event.has("VESversion")) + event.remove("VESversion"); + + log.debug("Modified event:" + event); + + } + + private void sendEventsToStreams(String[] streamIdList) { + for (String aStreamIdList : streamIdList) { + log.info("Invoking publisher for streamId:" + aStreamIdList); + this.overrideEvent(); + EventPublisherHash.getInstance().sendEvent(event, aStreamIdList); + + } + } + + private void addCurrentTimeToEvent(JSONObject event) { + final Date currentTime = new Date(); + JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime)); + JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER); + commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp); + event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey); + } + + void parseEventsJson(List eventsTransform, ConfigProcessorAdapter configProcessorAdapter) { + + // load VESProcessors class at runtime + for (Event eventTransform : eventsTransform) { + JSONObject filterObj = new JSONObject(eventTransform.filter.toString()); + if (configProcessorAdapter.isFilterMet(filterObj)) { + callProcessorsMethod(configProcessorAdapter, eventTransform.processors); + } + } + } + + private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List processors) { + // call the processor method + for (Processor processor : processors) { + final String functionName = processor.functionName; + final JSONObject args = new JSONObject(processor.args.toString()); + + log.info(String.format("functionName==%s | args==%s", functionName, args)); + // reflect method call + try { + configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args); + } catch (ReflectiveOperationException e) { + log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause()); + } + } + } + + static class ConfigProcessorAdapter { + private final ConfigProcessors configProcessors; + + ConfigProcessorAdapter(ConfigProcessors configProcessors) { + this.configProcessors = configProcessors; + } + + boolean isFilterMet(JSONObject parameter) { + return configProcessors.isFilterMet(parameter); + } + + void runConfigProcessorFunctionByName(String functionName, JSONObject parameter) throws ReflectiveOperationException { + Method method = configProcessors.getClass().getDeclaredMethod(functionName, parameter.getClass()); + method.invoke(configProcessors, parameter); + } + } } + diff --git a/src/main/java/org/onap/dcae/commonFunction/Processor.java b/src/main/java/org/onap/dcae/commonFunction/Processor.java new file mode 100644 index 00000000..ea79f1d3 --- /dev/null +++ b/src/main/java/org/onap/dcae/commonFunction/Processor.java @@ -0,0 +1,33 @@ +/*- + * ============LICENSE_START======================================================= + * PROJECT + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.commonFunction; + +import com.google.gson.JsonObject; + +class Processor { + final String functionName; + final JsonObject args; + + Processor(String functionName, JsonObject args) { + this.functionName = functionName; + this.args = args; + } +} diff --git a/src/test/java/org/onap/dcae/commonFunction/ConfigProcessorAdapterTest.java b/src/test/java/org/onap/dcae/commonFunction/ConfigProcessorAdapterTest.java new file mode 100644 index 00000000..634424b0 --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/ConfigProcessorAdapterTest.java @@ -0,0 +1,66 @@ +// +// ================================================================================ +// Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. +// Copyright (c) 2018 Nokia. All rights reserved. +// ================================================================================ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ============LICENSE_END========================================================= +// +// +package org.onap.dcae.commonFunction; + +import org.json.JSONObject; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ConfigProcessorAdapterTest { + + @Mock + private ConfigProcessors configProcessors; + + @InjectMocks + private EventProcessor.ConfigProcessorAdapter configProcessorAdapter; + + + @Test + public void shouldCallIsFilterMetOnAdapter() throws Exception { + //given + JSONObject parameter = new JSONObject(); + when(configProcessors.isFilterMet(parameter)).thenReturn(true); + //when + boolean actualReturn = configProcessorAdapter.isFilterMet(parameter); + //then + assertTrue(actualReturn); + verify(configProcessors, times(1)).isFilterMet(parameter); + } + + @Test + public void shouldCallGivenMethodFromConfigProcessor() throws Exception { + JSONObject parameter = new JSONObject(); + String exampleFunction = "concatenateValue"; + //when + configProcessorAdapter.runConfigProcessorFunctionByName(exampleFunction, parameter); + //then + verify(configProcessors, times(1)).concatenateValue(parameter); + } + +} \ No newline at end of file diff --git a/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java new file mode 100644 index 00000000..a3a47720 --- /dev/null +++ b/src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java @@ -0,0 +1,117 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.ves + * ================================================================================ + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.commonFunction; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.google.gson.Gson; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.FileNotFoundException; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.onap.dcae.commonFunction.EventProcessor.EVENT_LIST_TYPE; + +public class EventProcessorTest { + + private final String ev = "{\"event\": {\"commonEventHeader\": { \"reportingEntityName\": \"VM name will be provided by ECOMP\", \"startEpochMicrosec\": 1477012779802988,\"lastEpochMicrosec\": 1477012789802988,\"eventId\": \"83\",\"sourceName\": \"Dummy VM name - No Metadata available\",\"sequence\": 83,\"priority\": \"Normal\",\"functionalRole\": \"vFirewall\",\"domain\": \"measurementsForVfScaling\",\"reportingEntityId\": \"VM UUID will be provided by ECOMP\",\"sourceId\": \"Dummy VM UUID - No Metadata available\",\"version\": 1.1},\"measurementsForVfScalingFields\": {\"measurementInterval\": 10,\"measurementsForVfScalingVersion\": 1.1,\"vNicUsageArray\": [{\"multicastPacketsIn\": 0,\"bytesIn\": 3896,\"unicastPacketsIn\": 0, \"multicastPacketsOut\": 0,\"broadcastPacketsOut\": 0, \"packetsOut\": 28,\"bytesOut\": 12178,\"broadcastPacketsIn\": 0,\"packetsIn\": 58,\"unicastPacketsOut\": 0,\"vNicIdentifier\": \"eth0\"}]}}}"; + private String testinput = "src/test/resources/testDmaapConfig_ip.json"; + + @Before + public void setUp() throws Exception { + CommonStartup.streamid = "fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling"; + CommonStartup.eventTransformFlag = 1; + } + + @Test + public void testLoad() { + //given + EventProcessor ec = new EventProcessor(); + ec.event = new org.json.JSONObject(ev); + //when + ec.overrideEvent(); + + //then + Boolean hasSourceNameNode = ec.event.getJSONObject("event").getJSONObject("commonEventHeader").has("sourceName"); + assertTrue(hasSourceNameNode); + } + + @Test + public void shouldParseJsonEvents() throws FileNotFoundException, ReflectiveOperationException { + //given + EventProcessor eventProcessor = new EventProcessor(); + String event_json = "[{ \"filter\": {\"event.commonEventHeader.domain\":\"heartbeat\",\"VESversion\":\"v4\"},\"processors\":[" + + "{\"functionName\": \"concatenateValue\",\"args\":{\"field\":\"event.commonEventHeader.eventName\",\"concatenate\": [\"$event.commonEventHeader.domain\",\"$event.commonEventHeader.eventType\",\"$event.faultFields.alarmCondition\"], \"delimiter\":\"_\"}}" + + ",{\"functionName\": \"addAttribute\",\"args\":{\"field\": \"event.heartbeatFields.heartbeatFieldsVersion\",\"value\": \"1.0\",\"fieldType\": \"number\"}}" + + ",{\"functionName\": \"map\",\"args\":{\"field\": \"event.commonEventHeader.nfNamingCode\",\"oldField\": \"event.commonEventHeader.functionalRole\"}}]}]"; + List events = new Gson().fromJson(event_json, EVENT_LIST_TYPE); + EventProcessor.ConfigProcessorAdapter configProcessorAdapter = mock(EventProcessor.ConfigProcessorAdapter.class); + + when(configProcessorAdapter.isFilterMet(any(JSONObject.class))).thenReturn(true); + ArgumentCaptor stringArgumentCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor jsonObjectArgumentCaptor = ArgumentCaptor.forClass(JSONObject.class); + //when + eventProcessor.parseEventsJson(events, configProcessorAdapter); + + //then + verify(configProcessorAdapter, times(3)).runConfigProcessorFunctionByName(stringArgumentCaptor.capture(), jsonObjectArgumentCaptor.capture()); + assertThat(stringArgumentCaptor.getAllValues()).contains("concatenateValue", "addAttribute", "map"); + } + + @Test + public void shouldCreateDmaapPublisher() { + + //given + EventPublisherHash eph = EventPublisherHash.getInstance(); + EventProcessor ec = new EventProcessor(); + ec.event = new org.json.JSONObject(ev); + CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json"; + + //when + CambriaBatchingPublisher pub = eph.getDmaapPublisher("sec_fault_ueb"); + + //then + assertNotNull(pub); + } + + @Test + public void shouldSendEventWithNoError() { + + EventPublisherHash eph = EventPublisherHash.getInstance(); + EventProcessor eventProcessor = new EventProcessor(); + eventProcessor.event = new org.json.JSONObject(ev); + CommonStartup.cambriaConfigFile = "src/test/resources/testDmaapConfig_ip.json"; + + //when + eph.sendEvent(eventProcessor.event, "sec_fault_ueb"); + } +} + diff --git a/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java b/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java deleted file mode 100644 index bfdb3d76..00000000 --- a/src/test/java/org/onap/dcae/vestest/TestEventProcessor.java +++ /dev/null @@ -1,132 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dcaegen2.collectors.ves - * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. - * Copyright (C) 2018 Nokia. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.vestest; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.theories.suppliers.TestedOn; -import org.onap.dcae.commonFunction.CommonStartup; -import org.onap.dcae.commonFunction.DmaapPropertyReader; -import org.onap.dcae.commonFunction.EventProcessor; -import org.onap.dcae.commonFunction.EventPublisherHash; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; - -public class TestEventProcessor { - - EventProcessor ec; - String ev= "{\"event\": {\"commonEventHeader\": { \"reportingEntityName\": \"VM name will be provided by ECOMP\", \"startEpochMicrosec\": 1477012779802988,\"lastEpochMicrosec\": 1477012789802988,\"eventId\": \"83\",\"sourceName\": \"Dummy VM name - No Metadata available\",\"sequence\": 83,\"priority\": \"Normal\",\"functionalRole\": \"vFirewall\",\"domain\": \"measurementsForVfScaling\",\"reportingEntityId\": \"VM UUID will be provided by ECOMP\",\"sourceId\": \"Dummy VM UUID - No Metadata available\",\"version\": 1.1},\"measurementsForVfScalingFields\": {\"measurementInterval\": 10,\"measurementsForVfScalingVersion\": 1.1,\"vNicUsageArray\": [{\"multicastPacketsIn\": 0,\"bytesIn\": 3896,\"unicastPacketsIn\": 0, \"multicastPacketsOut\": 0,\"broadcastPacketsOut\": 0, \"packetsOut\": 28,\"bytesOut\": 12178,\"broadcastPacketsIn\": 0,\"packetsIn\": 58,\"unicastPacketsOut\": 0,\"vNicIdentifier\": \"eth0\"}]}}}"; - String testinput; - - @Before - public void setUp() throws Exception { - CommonStartup.streamid="fault=sec_fault|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert|voiceQuality=ves_voicequality|sipSignaling=ves_sipsignaling"; - CommonStartup.eventTransformFlag = 1; - testinput = "src/test/resources/testDmaapConfig_ip.json"; - - } - - @After - public void tearDown() throws Exception { - } - - @Test - public void testLoad() { - - - EventProcessor ec = new EventProcessor(); - - ec.event=new org.json.JSONObject(ev); - - ec.overrideEvent(); - //event.commonEventHeader.sourceName - Boolean flag = ec.event.getJSONObject("event").getJSONObject("commonEventHeader").has("sourceName"); - assertEquals(true, flag); - } - - - @Test - public void testpublisherhashclass() { - - DmaapPropertyReader dr = null; - EventPublisherHash eph = null; - - Boolean flag = false; - dr = new DmaapPropertyReader(testinput); - eph = EventPublisherHash.getInstance(); - - - if (eph.equals(null)) - { - flag = false; - } - else - { - flag = true; - } - assertEquals(true, flag); - - - } - - @Test - public void testpublisherhashclassload() { - - DmaapPropertyReader dr; - EventPublisherHash eph = null; - - dr = new DmaapPropertyReader(testinput); - eph = EventPublisherHash.getInstance(); - EventProcessor ec = new EventProcessor(); - ec.event=new org.json.JSONObject(ev); - CommonStartup.cambriaConfigFile="src/test/resources/testDmaapConfig_ip.json"; - - CambriaBatchingPublisher pub = eph.getDmaapPublisher("sec_fault_ueb"); - - assertNotNull(pub); - } - - @Test - public void testpublisherhashSend() { - - DmaapPropertyReader dr; - EventPublisherHash eph = null; - - Boolean flag = true; - dr = new DmaapPropertyReader(testinput); - eph = EventPublisherHash.getInstance(); - - - EventProcessor ec = new EventProcessor(); - ec.event=new org.json.JSONObject(ev); - CommonStartup.cambriaConfigFile="src/test/resources/testDmaapConfig_ip.json"; - eph.sendEvent(ec.event, "sec_fault_ueb"); - - assertEquals(true, flag); - - } -} - -- cgit 1.2.3-korg