diff options
Diffstat (limited to 'dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet')
4 files changed, 572 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowletTest.java b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowletTest.java new file mode 100644 index 0000000..7755a13 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowletTest.java @@ -0,0 +1,251 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import com.google.common.collect.ImmutableList; +import org.junit.Test; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; +import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity; +import org.onap.dcae.apod.analytics.cdap.tca.BaseAnalyticsCDAPTCAUnitTest; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; + +import java.util.Date; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 9/12/2017. + */ +@SuppressWarnings("unchecked") +public class TCAVESAlertsAbatementFlowletTest extends BaseAnalyticsCDAPTCAUnitTest { + + private static final TCAPolicyPreferences sampleTCAPolicyPreferences = getSampleTCAPolicyPreferences(); + private static final List<MetricsPerEventName> metricsPerEventNames = sampleTCAPolicyPreferences + .getMetricsPerEventName(); + private final OutputEmitter<String> mockOutputEmitter = mock(OutputEmitter.class); + + private class TestTCAVESAlertsAbatementFlowlet extends TCAVESAlertsAbatementFlowlet { + + public TestTCAVESAlertsAbatementFlowlet(String tcaAlertsAbatementTableName) { + super(tcaAlertsAbatementTableName); + this.alertsAbatementOutputEmitter = mockOutputEmitter; + doNothing().when(mockOutputEmitter).emit(any(String.class)); + } + } + + @Test + public void testConfigure() throws Exception { + final TCAVESAlertsAbatementFlowlet tcavesAlertsAbatementFlowlet = + new TCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName"); + assertFlowletNameAndDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET, + CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET, + tcavesAlertsAbatementFlowlet); + } + + @Test(expected = CDAPSettingsException.class) + public void testDetermineAbatementAlertsWhenViolatedMetricsEventNameIsBlank() throws Exception { + + final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet = + new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName"); + final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.ONSET); + final ThresholdCalculatorOutput mockThresholdCalculatorOutput = + getMockThresholdCalculatorOutput(violatedThreshold); + when(mockThresholdCalculatorOutput.getViolatedMetricsPerEventName()).thenReturn(""); + + tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput); + } + + @Test + public void testDetermineAbatementAlertsWhenControlLoopTypeIsONSET() throws Exception { + + final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName"; + final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet = + new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName"); + + final FlowletContext mockFlowletContext = mock(FlowletContext.class); + final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class); + when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable); + tcaAlertsAbatementFlowlet.initialize(mockFlowletContext); + + doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class)); + + final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.ONSET); + final ThresholdCalculatorOutput mockThresholdCalculatorOutput = + getMockThresholdCalculatorOutput(violatedThreshold); + + tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput); + verify(mockObjectMappedTable, + times(1)).write(any(String.class), any(TCAAlertsAbatementEntity.class)); + verify(mockOutputEmitter, times(1)).emit(any(String.class)); + + } + + + @Test + public void testDetermineAbatementAlertsWhenControlLoopTypeIsABATEDAndNoPreviousAlertWasSent() throws Exception { + + final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName"; + final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet = + new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName"); + + final FlowletContext mockFlowletContext = mock(FlowletContext.class); + final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class); + when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable); + tcaAlertsAbatementFlowlet.initialize(mockFlowletContext); + + doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class)); + final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = mock(TCAAlertsAbatementEntity.class); + when(mockObjectMappedTable.read(any(String.class))).thenReturn(tcaAlertsAbatementEntity); + when(tcaAlertsAbatementEntity.getAbatementSentTS()).thenReturn(null); + + final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.ABATED); + final ThresholdCalculatorOutput mockThresholdCalculatorOutput = + getMockThresholdCalculatorOutput(violatedThreshold); + + tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput); + verify(mockObjectMappedTable, + times(1)).write(any(String.class), any(TCAAlertsAbatementEntity.class)); + verify(mockOutputEmitter, times(1)).emit(any(String.class)); + + } + + @Test + public void testDetermineAbatementAlertsWhenControlLoopTypeIsABATEDAndPreviousAlertWasAlreadySent() throws + Exception { + + final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName"; + final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet = + new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName"); + + final FlowletContext mockFlowletContext = mock(FlowletContext.class); + final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class); + when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable); + tcaAlertsAbatementFlowlet.initialize(mockFlowletContext); + + doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class)); + final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = mock(TCAAlertsAbatementEntity.class); + when(mockObjectMappedTable.read(any(String.class))).thenReturn(tcaAlertsAbatementEntity); + final long time = new Date().getTime(); + when(tcaAlertsAbatementEntity.getAbatementSentTS()).thenReturn(Long.toString(time)); + + final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.ABATED); + final ThresholdCalculatorOutput mockThresholdCalculatorOutput = + getMockThresholdCalculatorOutput(violatedThreshold); + + tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput); + verify(mockObjectMappedTable, + times(0)).write(any(String.class), any(TCAAlertsAbatementEntity.class)); + verify(mockOutputEmitter, times(0)).emit(any(String.class)); + + } + + + @Test + public void testDetermineAbatementAlertsWhenControlLoopTypeIsABATEDAndNoPreviousONSETEventFound() throws + Exception { + + final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName"; + final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet = + new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName"); + + final FlowletContext mockFlowletContext = mock(FlowletContext.class); + final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class); + when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable); + tcaAlertsAbatementFlowlet.initialize(mockFlowletContext); + + doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class)); + when(mockObjectMappedTable.read(any(String.class))).thenReturn(null); + + final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.ABATED); + final ThresholdCalculatorOutput mockThresholdCalculatorOutput = + getMockThresholdCalculatorOutput(violatedThreshold); + + tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput); + verify(mockObjectMappedTable, + times(0)).write(any(String.class), any(TCAAlertsAbatementEntity.class)); + verify(mockOutputEmitter, times(0)).emit(any(String.class)); + + } + + @Test(expected = CDAPSettingsException.class) + public void testDetermineAbatementAlertsWhenControlLoopTypeIsNotOnsetOrAbated() throws + Exception { + final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet = + new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName"); + final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.CONTINUE); + final ThresholdCalculatorOutput mockThresholdCalculatorOutput = + getMockThresholdCalculatorOutput(violatedThreshold); + + tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput); + + } + + private static Threshold getViolatedThreshold(final ClosedLoopEventStatus closedLoopEventStatus) { + final Threshold violatedThreshold = Threshold.copy(metricsPerEventNames.get(0).getThresholds().get(0)); + violatedThreshold.setClosedLoopEventStatus(closedLoopEventStatus); + return violatedThreshold; + } + + + private static ThresholdCalculatorOutput getMockThresholdCalculatorOutput(final Threshold violatedThreshold) throws + Exception { + + final MetricsPerEventName violatedMetricsPerEventName = + MetricsPerEventName.copy(metricsPerEventNames.get(0)); + violatedMetricsPerEventName.setThresholds(ImmutableList.of(violatedThreshold)); + return getMockThresholdCalculatorOutput( + fromStream(CEF_MESSAGE_JSON_FILE_LOCATION), + fromStream(TCA_POLICY_JSON_FILE_LOCATION), + TCAUtils.writeValueAsString(violatedMetricsPerEventName), + fromStream(TCA_ALERT_JSON_FILE_LOCATION) + ); + } + + + private static ThresholdCalculatorOutput getMockThresholdCalculatorOutput(final String cefMessage, + final String tcaPolicy, + final String violatedMetricsPerEventName, + final String alertMessage) { + final ThresholdCalculatorOutput thresholdCalculatorOutput = mock(ThresholdCalculatorOutput.class); + when(thresholdCalculatorOutput.getCefMessage()).thenReturn(cefMessage); + when(thresholdCalculatorOutput.getTcaPolicy()).thenReturn(tcaPolicy); + when(thresholdCalculatorOutput.getViolatedMetricsPerEventName()).thenReturn(violatedMetricsPerEventName); + when(thresholdCalculatorOutput.getAlertMessage()).thenReturn(alertMessage); + return thresholdCalculatorOutput; + } + +} diff --git a/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowletTest.java b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowletTest.java new file mode 100644 index 0000000..242c712 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsSinkFlowletTest.java @@ -0,0 +1,78 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import org.junit.Test; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity; +import org.onap.dcae.apod.analytics.cdap.tca.BaseAnalyticsCDAPTCAUnitTest; + +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 12/16/2016. + */ +public class TCAVESAlertsSinkFlowletTest extends BaseAnalyticsCDAPTCAUnitTest { + + + @Test + public void testConfigure() throws Exception { + final TCAVESAlertsSinkFlowlet tcavesAlertsSinkFlowlet = + new TCAVESAlertsSinkFlowlet("testTCAVESAlertTableName"); + assertFlowletNameAndDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_NAME_FLOWLET, + CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_SINK_DESCRIPTION_FLOWLET, tcavesAlertsSinkFlowlet); + } + + @SuppressWarnings("unchecked") + @Test + public void saveAlerts() throws Exception { + + final String testAlertTableName = "testTCAVESAlertTableName"; + + final TCAVESAlertsSinkFlowlet tcavesAlertsSinkFlowlet = new TCAVESAlertsSinkFlowlet(testAlertTableName); + + final FlowletContext mockFlowletContext = Mockito.mock(FlowletContext.class); + final ObjectMappedTable mockObjectMappedTable = Mockito.mock(ObjectMappedTable.class); + when(mockFlowletContext.getDataset(eq(testAlertTableName))).thenReturn(mockObjectMappedTable); + tcavesAlertsSinkFlowlet.initialize(mockFlowletContext); + final ObjectMappedTable tcaVESAlertsTableName = + getPrivateFiledValue(tcavesAlertsSinkFlowlet, "tcaVESAlertsTable", ObjectMappedTable.class); + assertTrue(tcaVESAlertsTableName == mockObjectMappedTable); + + doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAVESAlertEntity.class)); + final String testAlertMessage = "testMessage"; + tcavesAlertsSinkFlowlet.saveAlerts(testAlertMessage); + + verify(mockObjectMappedTable, + times(1)).write(any(String.class), any(TCAVESAlertEntity.class)); + + } + +} diff --git a/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowletTest.java b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowletTest.java new file mode 100644 index 0000000..610227d --- /dev/null +++ b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESMessageRouterFlowletTest.java @@ -0,0 +1,80 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import co.cask.cdap.api.flow.flowlet.StreamEvent; +import com.google.common.base.Charsets; +import org.junit.Test; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.tca.BaseAnalyticsCDAPTCAUnitTest; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; + +import java.nio.ByteBuffer; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 12/19/2016. + */ +public class TCAVESMessageRouterFlowletTest extends BaseAnalyticsCDAPTCAUnitTest { + + private static final String TEST_MESSAGE = "test message"; + private final OutputEmitter mockOutputEmitter = Mockito.mock(OutputEmitter.class); + + private class TCATestVESMessageRouterFlowlet extends TCAVESMessageRouterFlowlet { + + @SuppressWarnings("unchecked") + public TCATestVESMessageRouterFlowlet() { + this.vesMessageEmitter = mockOutputEmitter; + doNothing().when(mockOutputEmitter).emit(eq(TEST_MESSAGE), + eq(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY), + eq(TEST_MESSAGE.hashCode())); + } + } + + @Test + public void testConfigure() throws Exception { + final TCAVESMessageRouterFlowlet tcavesMessageRouterFlowlet = new TCAVESMessageRouterFlowlet(); + assertFlowletNameAndDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_NAME_FLOWLET, + CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_DESCRIPTION_FLOWLET, tcavesMessageRouterFlowlet); + } + + @SuppressWarnings("unchecked") + @Test + public void routeVESMessage() throws Exception { + final TCATestVESMessageRouterFlowlet tcavesMessageRouterFlowlet = new TCATestVESMessageRouterFlowlet(); + final StreamEvent mockStreamEvent = Mockito.mock(StreamEvent.class); + final ByteBuffer testMessage = Charsets.UTF_8.encode(TEST_MESSAGE); + when(mockStreamEvent.getBody()).thenReturn(testMessage); + tcavesMessageRouterFlowlet.routeVESMessage(mockStreamEvent); + verify(mockOutputEmitter, + times(1)).emit(eq(TEST_MESSAGE), + eq(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY), + eq(TEST_MESSAGE.hashCode())); + } + +} diff --git a/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowletTest.java b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowletTest.java new file mode 100644 index 0000000..988cd96 --- /dev/null +++ b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESThresholdViolationCalculatorFlowletTest.java @@ -0,0 +1,163 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.tca.flowlet; + +import co.cask.cdap.api.app.ApplicationSpecification; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.flow.flowlet.FlowletContext; +import co.cask.cdap.api.flow.flowlet.OutputEmitter; +import co.cask.cdap.api.metrics.Metrics; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput; +import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity; +import org.onap.dcae.apod.analytics.cdap.tca.BaseAnalyticsCDAPTCAUnitTest; +import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 12/19/2016. + */ +@SuppressWarnings("unchecked") +public class TCAVESThresholdViolationCalculatorFlowletTest extends BaseAnalyticsCDAPTCAUnitTest { + + private static final String messageStatusTableName = "TEST_MESSAGE_STATUS_TABLE"; + + private TCAVESThresholdViolationCalculatorFlowlet violationCalculatorFlowlet; + private Metrics metrics; + private OutputEmitter outputEmitter; + private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable; + + private static class TCATestVESThresholdViolationCalculatorFlowlet extends + TCAVESThresholdViolationCalculatorFlowlet { + public TCATestVESThresholdViolationCalculatorFlowlet( + final String messageStatusTableName, + final OutputEmitter tcaAlertOutputEmitter, + ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable, + Metrics metrics) { + super(messageStatusTableName); + this.tcaAlertOutputEmitter = tcaAlertOutputEmitter; + this.metrics = metrics; + } + } + + @Before + public void before() { + violationCalculatorFlowlet = new TCAVESThresholdViolationCalculatorFlowlet(messageStatusTableName); + vesMessageStatusTable = Mockito.mock(ObjectMappedTable.class); + outputEmitter = Mockito.mock(OutputEmitter.class); + metrics = Mockito.mock(Metrics.class); + } + + @Test + public void testConfigure() throws Exception { + assertFlowletNameAndDescription( + CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_NAME_FLOWLET, + CDAPComponentsConstants.TCA_FIXED_VES_THRESHOLD_VIOLATION_CALCULATOR_DESCRIPTION_FLOWLET, + violationCalculatorFlowlet); + } + + @Test + public void testInitialize() throws Exception { + final FlowletContext mockFlowletContext = initializeFlowlet(violationCalculatorFlowlet, vesMessageStatusTable); + verify(mockFlowletContext, times(1)).getDataset(anyString()); + } + + @Test + public void testFilterVESMessagesWhenVESMessageIsInApplicable() throws Exception { + final TCATestVESThresholdViolationCalculatorFlowlet thresholdViolationCalculatorFlowlet = + createTestViolationCalculator(vesMessageStatusTable, outputEmitter, metrics); + initializeFlowlet(thresholdViolationCalculatorFlowlet, vesMessageStatusTable); + thresholdViolationCalculatorFlowlet.filterVESMessages("inapplicable"); + verify(vesMessageStatusTable, times(1)).write(anyString(), + any(TCAMessageStatusEntity.class)); + } + + @Test + public void testFilterVESMessagesWhenVESMessageIsCompliant() throws Exception { + final TCATestVESThresholdViolationCalculatorFlowlet thresholdViolationCalculatorFlowlet = + createTestViolationCalculator(vesMessageStatusTable, outputEmitter, metrics); + initializeFlowlet(thresholdViolationCalculatorFlowlet, vesMessageStatusTable); + thresholdViolationCalculatorFlowlet.filterVESMessages(getValidCEFMessage()); + verify(vesMessageStatusTable, times(1)).write(anyString(), + any(TCAMessageStatusEntity.class)); + } + + @Test + public void testFilterVESMessagesWhenVESMessageNonCompliant() throws Exception { + final TCATestVESThresholdViolationCalculatorFlowlet thresholdViolationCalculatorFlowlet = + createTestViolationCalculator(vesMessageStatusTable, outputEmitter, metrics); + final FlowletContext flowletContext = + initializeFlowlet(thresholdViolationCalculatorFlowlet, vesMessageStatusTable); + final TCAPolicy policy = CDAPTCAUtils.getValidatedTCAPolicyPreferences(flowletContext); + final Threshold threshold = policy.getMetricsPerEventName().get(0).getThresholds().get(0); + final Long thresholdValue = threshold.getThresholdValue(); + final EventListener thresholdViolatingMessage = getCEFEventListener(); + thresholdViolatingMessage.getEvent().getMeasurementsForVfScalingFields().getVNicPerformanceArray(). + get(0).setReceivedBroadcastPacketsAccumulated(thresholdValue - 1); + thresholdViolationCalculatorFlowlet.filterVESMessages( + ANALYTICS_MODEL_OBJECT_MAPPER.writeValueAsString(thresholdViolatingMessage)); + verify(vesMessageStatusTable, times(1)).write(anyString(), + any(TCAMessageStatusEntity.class)); + verify(outputEmitter, times(1)).emit(any(ThresholdCalculatorOutput.class)); + } + + private static TCATestVESThresholdViolationCalculatorFlowlet createTestViolationCalculator( + final ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable, + final OutputEmitter outputEmitter, final Metrics metrics) { + doNothing().when(outputEmitter).emit(anyString()); + doNothing().when(metrics).count(anyString(), anyInt()); + doNothing().when(vesMessageStatusTable).write(anyString(), any(TCAMessageStatusEntity.class)); + return new TCATestVESThresholdViolationCalculatorFlowlet(messageStatusTableName, outputEmitter, + vesMessageStatusTable, metrics); + } + + private static <T extends TCAVESThresholdViolationCalculatorFlowlet> FlowletContext initializeFlowlet( + T calculatorFlowlet, ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable) throws Exception { + final FlowletContext mockFlowletContext = getTestFlowletContextWithValidPolicy(); + when(mockFlowletContext.getDataset(anyString())).thenReturn(vesMessageStatusTable); + when(mockFlowletContext.getInstanceId()).thenReturn(1); + ApplicationSpecification mockApplicationSpecification = Mockito.mock(ApplicationSpecification.class); + when(mockApplicationSpecification.getConfiguration()).thenReturn(fromStream(TCA_APP_CONFIG_FILE_LOCATION)); + when(mockFlowletContext.getApplicationSpecification()).thenReturn(mockApplicationSpecification); + when(mockApplicationSpecification.getName()).thenReturn("TestTCAAppName"); + try { + calculatorFlowlet.initialize(mockFlowletContext); + return mockFlowletContext; + } catch (Exception e) { + LOG.error("error while flowlet initialization"); + throw new RuntimeException(e); + } + } + +} |