From c489a2eb22484e798c39a978bc8b61821b92322f Mon Sep 17 00:00:00 2001 From: an4828 Date: Mon, 22 Jan 2018 17:17:34 -0500 Subject: TCA: Replace any openecomp reference by onap Change-Id: I7c6d812ab5c1d7b30c63653d1974b0b1abc099be Signed-off-by: an4828 Issue-ID: DCAEGEN2-224 Signed-off-by: an4828 --- dcae-analytics-tca/pom.xml | 214 ++--- .../processor/AbstractTCAECEFPolicyProcessor.java | 61 ++ .../tca/processor/TCACEFJsonProcessor.java | 98 ++ .../tca/processor/TCACEFPolicyDomainFilter.java | 84 ++ .../tca/processor/TCACEFPolicyEventNameFilter.java | 91 ++ .../processor/TCACEFPolicyThresholdsProcessor.java | 138 +++ .../tca/processor/TCACEFProcessorContext.java | 103 ++ .../dcae/apod/analytics/tca/utils/TCAUtils.java | 1016 ++++++++++++++++++++ .../processor/AbstractTCAECEFPolicyProcessor.java | 61 -- .../tca/processor/TCACEFJsonProcessor.java | 98 -- .../tca/processor/TCACEFPolicyDomainFilter.java | 84 -- .../tca/processor/TCACEFPolicyEventNameFilter.java | 91 -- .../processor/TCACEFPolicyThresholdsProcessor.java | 138 --- .../tca/processor/TCACEFProcessorContext.java | 103 -- .../dcae/apod/analytics/tca/utils/TCAUtils.java | 1016 -------------------- .../analytics/tca/BaseAnalyticsTCAUnitTest.java | 162 ++++ .../AbstractTCAECEFPolicyProcessorTest.java | 56 ++ .../tca/processor/TCACEFJsonProcessorTest.java | 117 +++ .../processor/TCACEFPolicyDomainFilterTest.java | 74 ++ .../processor/TCACEFPolicyEventNameFilterTest.java | 75 ++ .../TCACEFPolicyThresholdsProcessorTest.java | 81 ++ .../tca/processor/TCACEFProcessorContextTest.java | 38 + .../apod/analytics/tca/utils/TCAUtilsTest.java | 418 ++++++++ .../analytics/tca/BaseAnalyticsTCAUnitTest.java | 162 ---- .../AbstractTCAECEFPolicyProcessorTest.java | 56 -- .../tca/processor/TCACEFJsonProcessorTest.java | 117 --- .../processor/TCACEFPolicyDomainFilterTest.java | 74 -- .../processor/TCACEFPolicyEventNameFilterTest.java | 75 -- .../TCACEFPolicyThresholdsProcessorTest.java | 81 -- .../tca/processor/TCACEFProcessorContextTest.java | 38 - .../apod/analytics/tca/utils/TCAUtilsTest.java | 418 -------- .../src/test/resources/logback-test.xml | 110 +-- 32 files changed, 2774 insertions(+), 2774 deletions(-) create mode 100644 dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java create mode 100644 dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java create mode 100644 dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java create mode 100644 dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java create mode 100644 dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java create mode 100644 dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java create mode 100644 dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/utils/TCAUtils.java delete mode 100644 dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java delete mode 100644 dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java delete mode 100644 dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java delete mode 100644 dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java delete mode 100644 dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java delete mode 100644 dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java delete mode 100644 dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtils.java create mode 100644 dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/BaseAnalyticsTCAUnitTest.java create mode 100644 dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessorTest.java create mode 100644 dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFJsonProcessorTest.java create mode 100644 dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilterTest.java create mode 100644 dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilterTest.java create mode 100644 dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessorTest.java create mode 100644 dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFProcessorContextTest.java create mode 100644 dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/utils/TCAUtilsTest.java delete mode 100644 dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/BaseAnalyticsTCAUnitTest.java delete mode 100644 dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessorTest.java delete mode 100644 dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessorTest.java delete mode 100644 dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilterTest.java delete mode 100644 dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilterTest.java delete mode 100644 dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessorTest.java delete mode 100644 dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContextTest.java delete mode 100644 dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtilsTest.java (limited to 'dcae-analytics-tca') diff --git a/dcae-analytics-tca/pom.xml b/dcae-analytics-tca/pom.xml index d6d26e8..5a87808 100644 --- a/dcae-analytics-tca/pom.xml +++ b/dcae-analytics-tca/pom.xml @@ -1,107 +1,107 @@ - - - - - - 4.0.0 - - - dcae-analytics - org.onap.dcaegen2.analytics.tca - 2.1.0-SNAPSHOT - - - dcae-analytics-tca - jar - - - DCAE Analytics TCA Core - DCAE Analytics TCA (THRESHOLD CROSSING ALERT) Core - - - ${project.parent.basedir} - - - - - - - org.onap.dcaegen2.analytics.tca - dcae-analytics-common - 2.1.0-SNAPSHOT - - - - org.onap.dcaegen2.analytics.tca - dcae-analytics-dmaap - 2.1.0-SNAPSHOT - - - - org.onap.dcaegen2.analytics.tca - dcae-analytics-aai - 2.1.0-SNAPSHOT - - - - - org.quartz-scheduler - quartz - - - - - org.slf4j - slf4j-api - - - - ch.qos.logback - logback-core - - - - ch.qos.logback - logback-classic - - - - - com.google.code.findbugs - jsr305 - - - com.google.code.findbugs - annotations - - - - - org.onap.dcaegen2.analytics.tca - dcae-analytics-test - 2.1.0-SNAPSHOT - test - - - - + + + + + + 4.0.0 + + + dcae-analytics + org.onap.dcaegen2.analytics.tca + 2.1.0-SNAPSHOT + + + dcae-analytics-tca + jar + + + DCAE Analytics TCA Core + DCAE Analytics TCA (THRESHOLD CROSSING ALERT) Core + + + ${project.parent.basedir} + + + + + + + org.onap.dcaegen2.analytics.tca + dcae-analytics-common + 2.1.0-SNAPSHOT + + + + org.onap.dcaegen2.analytics.tca + dcae-analytics-dmaap + 2.1.0-SNAPSHOT + + + + org.onap.dcaegen2.analytics.tca + dcae-analytics-aai + 2.1.0-SNAPSHOT + + + + + org.quartz-scheduler + quartz + + + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-core + + + + ch.qos.logback + logback-classic + + + + + com.google.code.findbugs + jsr305 + + + com.google.code.findbugs + annotations + + + + + org.onap.dcaegen2.analytics.tca + dcae-analytics-test + 2.1.0-SNAPSHOT + test + + + + diff --git a/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java new file mode 100644 index 0000000..b2da5a4 --- /dev/null +++ b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java @@ -0,0 +1,61 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import org.onap.dcae.apod.analytics.common.exception.MessageProcessingException; +import org.onap.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +/** + *

+ * Encapsulates common functionality for all TCA CEF Policy Processors + *

+ * + * @author Rajiv Singla . Creation Date: 11/9/2016. + */ +public abstract class AbstractTCAECEFPolicyProcessor extends AbstractMessageProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractTCAECEFPolicyProcessor.class); + + /** + * For all TCA Policy Processor the pre processor ensures that {@link EventListener} object is + * present + * + * @param processorContext incoming Processor Context + * @return Pre processed Processor Context + */ + @Override + public TCACEFProcessorContext preProcessor(@Nonnull TCACEFProcessorContext processorContext) { + // validates CEF Event Listener is Present + final EventListener cefEventListener = processorContext.getCEFEventListener(); + if (cefEventListener == null) { + final String errorMessage = String.format( + "CEF Event Listener is not Present.Invalid use of Processor: %s. CEF Message: %s", + getProcessorInfo().getProcessorName(), processorContext.getMessage()); + throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + return super.preProcessor(processorContext); + } +} diff --git a/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java new file mode 100644 index 0000000..fdeeb3d --- /dev/null +++ b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java @@ -0,0 +1,98 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import org.apache.commons.lang3.StringUtils; +import org.onap.dcae.apod.analytics.common.exception.MessageProcessingException; +import org.onap.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + *

+ * Processor that converts incoming presumed JSON string CEF message to {@link EventListener} object + *
+ * Pre Conditions: None + *

+ * + * @author Rajiv Singla . Creation Date: 11/5/2016. + */ +public class TCACEFJsonProcessor extends AbstractMessageProcessor { + + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(TCACEFJsonProcessor.class); + + + @Override + public String getProcessorDescription() { + return "Converts incoming TCA CEF Message to Event Listener object"; + } + + @Override + public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { + + final String cefMessage = processorContext.getMessage(); + + // If CEF Message is null then processor should stop processing + if (cefMessage == null) { + String errorMessage = "Null CEF message cannot be converted to CEF Event Listener Object"; + throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + + // If CEF Message is blank then processor stop processing + if (StringUtils.isBlank(cefMessage)) { + setTerminatingProcessingMessage("Blank CEF message cannot be converted to CEF Event Listener Object", + processorContext); + return processorContext; + } + + // trim cef message + final String trimmedCEFMessage = cefMessage.trim(); + + // if message does not start with curly brace and ends with curly brace, it is not a valid cef message + // processor will stop processing + if (!(trimmedCEFMessage.startsWith("{") && trimmedCEFMessage.endsWith("}"))) { + setTerminatingProcessingMessage("CEF Message must start with curly brace and must end with curly brace", + processorContext); + return processorContext; + } + + // try parsing the cef message + try { + final EventListener eventListener = TCAUtils.readValue(trimmedCEFMessage, EventListener.class); + setFinishedProcessingMessage("CEF JSON to Event Listener Conversion Successful", processorContext); + // set new Event Listener in the Processor Context + processorContext.setCEFEventListener(eventListener); + return processorContext; + } catch (IOException e) { + final String errorMessage = String.format("Parsing Failed for CEF Message: %s, Error: %s", cefMessage, e); + // If parsing fails throw an exception + throw new MessageProcessingException(errorMessage, LOG, e); + } + + } +} diff --git a/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java new file mode 100644 index 0000000..e5db327 --- /dev/null +++ b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java @@ -0,0 +1,84 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import org.onap.dcae.apod.analytics.model.domain.cef.Domain; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; + +/** + *

+ * TCA Processor which acts like a filter to filter out messages which does not belong to TCA Policy Domain + *
+ * Pre Conditions: CEF Event Listener must be present + *

+ * + * @author Rajiv Singla . Creation Date: 11/7/2016. + */ +public class TCACEFPolicyDomainFilter extends AbstractTCAECEFPolicyProcessor { + + + private static final long serialVersionUID = 1L; + + @Override + public String getProcessorDescription() { + return "Filters out CEF Messages which does not match TCAPolicy Domain"; + } + + @Override + public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { + + // Safe to get event Listener here without null check as pre processor will validate if + // event listener is indeed present + final EventListener eventListener = processorContext.getCEFEventListener(); + + Domain cefMessageDomain; + + // Extract CEF domain as it is must be present as per CEF Schema + if (eventListener.getEvent() != null && + eventListener.getEvent().getCommonEventHeader() != null && + eventListener.getEvent().getCommonEventHeader().getDomain() != null) { + cefMessageDomain = eventListener.getEvent().getCommonEventHeader().getDomain(); + + } else { + final String terminatingMessage = "Invalid CEF Message.Common Event Header Domain not present."; + setTerminatingProcessingMessage(terminatingMessage, processorContext); + return processorContext; + } + + // Get Policy Domain. TCA Policy Validation must ensure that Domain is indeed present + // no null check will be required here + final String policyDomain = processorContext.getTCAPolicy().getDomain(); + + // If Policy domain matches CEF message domain then continue processing + if (cefMessageDomain.toString().equalsIgnoreCase(policyDomain)) { + final String finishMessage = String.format("Policy Domain and CEF Message Domain match successful." + + " Message Domain: %s, Policy Domain: %s", cefMessageDomain, policyDomain); + setFinishedProcessingMessage(finishMessage, processorContext); + } else { + // If policy domain does not match with CEF message terminate processing chain + final String terminatingMessage = String.format("Policy Domain and CEF Message Domain match unsuccessful." + + " Message Domain: %s, Policy Domain: %s", cefMessageDomain, policyDomain); + setTerminatingProcessingMessage(terminatingMessage, processorContext); + } + + return processorContext; + } +} diff --git a/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java new file mode 100644 index 0000000..73cbe78 --- /dev/null +++ b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java @@ -0,0 +1,91 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import com.google.common.base.Joiner; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; + +import java.util.List; + +import static org.onap.dcae.apod.analytics.tca.utils.TCAUtils.getPolicyEventNamesSupplier; + +/** + *

+ * TCA Processor that acts like a filter to filter out messages which does not belong to TCA Policy Event Name + *
+ * Pre Conditions: CEF Event Listener must be present + *

+ * + * @author Rajiv Singla . Creation Date: 11/9/2016. + */ +public class TCACEFPolicyEventNameFilter extends AbstractTCAECEFPolicyProcessor { + + private static final long serialVersionUID = 1L; + + @Override + public String getProcessorDescription() { + return "Filters out CEF Messages which does not match Policy Functional Roles"; + } + + @Override + public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { + + // Safe to get event Listener here without null check as pre processor will validate if + // event listener is indeed present + final EventListener eventListener = processorContext.getCEFEventListener(); + + String cefMessageEventName; + + if (eventListener.getEvent() != null && + eventListener.getEvent().getCommonEventHeader() != null && + eventListener.getEvent().getCommonEventHeader().getEventName() != null) { + cefMessageEventName = eventListener.getEvent().getCommonEventHeader().getEventName(); + } else { + String terminationMessage = "Invalid CEF Message.Common Event Header Event Name not present."; + setTerminatingProcessingMessage(terminationMessage, processorContext); + return processorContext; + } + + // Determine Policy Functional Roles + final TCAPolicy tcaPolicy = processorContext.getTCAPolicy(); + final List policyEventNames = getPolicyEventNamesSupplier(tcaPolicy).get(); + final String policyEventNamesString = Joiner.on(",").join(policyEventNames); + + // If Policy event names contains CEF message event names then continue processing + if (policyEventNames.contains(cefMessageEventName)) { + final String finishMessage = String.format( + "Policy Event Name and CEF Message Event Name match successful." + + "Message EventName: %s, Policy Event Names: %s", + cefMessageEventName, policyEventNamesString); + setFinishedProcessingMessage(finishMessage, processorContext); + } else { + // If Policy event names does not contain CEF message event names then terminate processing + final String terminatingMessage = String.format( + "Policy Event name and CEF Message Event name match unsuccessful." + + "Message EventName: %s, Policy Event Names: %s", + cefMessageEventName, policyEventNamesString); + setTerminatingProcessingMessage(terminatingMessage, processorContext); + } + + return processorContext; + } +} diff --git a/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java new file mode 100644 index 0000000..c7d780d --- /dev/null +++ b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java @@ -0,0 +1,138 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import com.google.common.base.Optional; +import com.google.common.collect.Table; +import org.onap.dcae.apod.analytics.common.exception.MessageProcessingException; +import org.onap.dcae.apod.analytics.model.domain.cef.Domain; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nonnull; + +/** + *

+ * TCA CEF Policy Threshold processor + *
+ * Pre Conditions: Domain and Functional Role must be present in CEF Event Listener Object + *

+ * + * @author Rajiv Singla . Creation Date: 11/9/2016. + */ +public class TCACEFPolicyThresholdsProcessor extends AbstractTCAECEFPolicyProcessor { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(TCACEFPolicyThresholdsProcessor.class); + + @Override + public TCACEFProcessorContext preProcessor(@Nonnull TCACEFProcessorContext processorContext) { + // validates Domain and Functional Role are present + final EventListener eventListener = processorContext.getCEFEventListener(); + final Domain domain = eventListener.getEvent().getCommonEventHeader().getDomain(); + final String eventName = eventListener.getEvent().getCommonEventHeader().getEventName(); + if (domain == null || eventName == null) { + final String errorMessage = "CEF Event Listener domain or eventName not Present. " + + "Invalid use of this Processor"; + throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + return super.preProcessor(processorContext); + } + + @Override + public String getProcessorDescription() { + return "Applies TCA Policy rules to incoming CEF message. If any thresholds are violated attaches max " + + "Severity violated threshold to TCA Processor Context"; + } + + @Override + public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { + + final String cefMessage = processorContext.getMessage(); + + // Determine domain and eventName + final EventListener eventListener = processorContext.getCEFEventListener(); + final String eventName = eventListener.getEvent().getCommonEventHeader().getEventName(); + + // Get Table containing event Name and Thresholds Field Path + final TCAPolicy tcaPolicy = processorContext.getTCAPolicy(); + final Table> eventNameFieldPathsTable = + TCAUtils.getPolicyEventNameThresholdsTableSupplier(tcaPolicy).get(); + + // Get Policy Field Paths for that event Name + final Map> policyFieldPathsMap = eventNameFieldPathsTable.row(eventName); + final Set policyFieldPaths = policyFieldPathsMap.keySet(); + + // Get Json Values for Policy Fields + final Map> messageFieldValuesMap = + TCAUtils.getJsonPathValue(cefMessage, policyFieldPaths); + + // Determine all violated thresholds per message field Path + final Map violatedThresholdsMap = new HashMap<>(); + for (Map.Entry> messageFieldValuesMapEntry : messageFieldValuesMap.entrySet()) { + final String messageFieldPath = messageFieldValuesMapEntry.getKey(); + final List messageFieldAssociatedPolicyThresholds = policyFieldPathsMap.get(messageFieldPath); + if (messageFieldAssociatedPolicyThresholds != null) { + final Optional thresholdOptional = TCAUtils.thresholdCalculator( + messageFieldValuesMapEntry.getValue(), messageFieldAssociatedPolicyThresholds); + if (thresholdOptional.isPresent()) { + violatedThresholdsMap.put(messageFieldPath, thresholdOptional.get()); + } + } + } + + // No threshold were violated + if (violatedThresholdsMap.isEmpty()) { + + final String terminationMessage = "No Policy Threshold violated by the VES CEF Message."; + setTerminatingProcessingMessage(terminationMessage, processorContext); + + } else { + + // If there are policy violations then determine max priority violation + final Threshold maxSeverityThresholdViolation = + TCAUtils.prioritizeThresholdViolations(violatedThresholdsMap); + final MetricsPerEventName violatedMetrics = TCAUtils.createViolatedMetrics(tcaPolicy, + maxSeverityThresholdViolation, eventName); + // attach policy violation to processor Context + processorContext.setMetricsPerEventName(violatedMetrics); + + final String finishMessage = String.format("Policy Threshold violation detected for threshold: %s", + maxSeverityThresholdViolation); + setFinishedProcessingMessage(finishMessage, processorContext); + + } + + return processorContext; + } +} diff --git a/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java new file mode 100644 index 0000000..4448b6b --- /dev/null +++ b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java @@ -0,0 +1,103 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import org.onap.dcae.apod.analytics.common.service.processor.AbstractProcessorContext; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; + +/** + * TCA CEF Policy Processor Context + * + * @author Rajiv Singla . Creation Date: 11/7/2016. + */ +public class TCACEFProcessorContext extends AbstractProcessorContext { + + private static final long serialVersionUID = 1L; + + private final TCAPolicy tcaPolicy; + private EventListener eventListener; + private MetricsPerEventName metricsPerEventName; + + public TCACEFProcessorContext(final String message, boolean canProcessingContinue, final TCAPolicy tcaPolicy) { + super(message, canProcessingContinue); + this.tcaPolicy = tcaPolicy; + // present only if cef incoming message can be parsed successfully to Event Listener Object + this.eventListener = null; + // present only if there are any threshold violations are detected + this.metricsPerEventName = null; + } + + // Auxiliary Constructor which default canProcessingContinue Flag to true + public TCACEFProcessorContext(final String message, final TCAPolicy tcaPolicy) { + this(message, true, tcaPolicy); + } + + /** + * Returns {@link TCAPolicy} Object + * + * @return TCA Policy + */ + public TCAPolicy getTCAPolicy() { + return tcaPolicy; + } + + /** + * Returns Common Event Format {@link EventListener} if present else null + * + * @return CEF Event Listener + */ + public EventListener getCEFEventListener() { + return eventListener; + } + + + /** + * Sets new {@link EventListener} + * + * @param eventListener set new value for CEF event listener + */ + public void setCEFEventListener(final EventListener eventListener) { + this.eventListener = eventListener; + } + + + /** + * Returns TCA Policy {@link MetricsPerEventName} which was has violated Threshold for the CEF Message if + * present else null + * + * @return Violated Threshold + */ + public MetricsPerEventName getMetricsPerEventName() { + return metricsPerEventName; + } + + /** + * Assign new TCA Policy {@link MetricsPerEventName} which was has violated Threshold for the CEF Message + * + * @param metricsPerEventName new value for Metrics Per Functional Role with violated threshold + */ + public void setMetricsPerEventName(MetricsPerEventName metricsPerEventName) { + this.metricsPerEventName = metricsPerEventName; + } + +} diff --git a/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/utils/TCAUtils.java b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/utils/TCAUtils.java new file mode 100644 index 0000000..fdb13b7 --- /dev/null +++ b/dcae-analytics-tca/src/main/java/org/onap/dcae/apod/analytics/tca/utils/TCAUtils.java @@ -0,0 +1,1016 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.TypeRef; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.dcae.apod.analytics.aai.service.AAIEnrichmentClient; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.common.exception.MessageProcessingException; +import org.onap.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor; +import org.onap.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor; +import org.onap.dcae.apod.analytics.model.domain.cef.AlertAction; +import org.onap.dcae.apod.analytics.model.domain.cef.AlertType; +import org.onap.dcae.apod.analytics.model.domain.cef.CommonEventHeader; +import org.onap.dcae.apod.analytics.model.domain.cef.Criticality; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.model.domain.cef.EventSeverity; +import org.onap.dcae.apod.analytics.model.domain.cef.PerformanceCounter; +import org.onap.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Direction; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.onap.dcae.apod.analytics.model.facade.tca.AAI; +import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse; +import org.onap.dcae.apod.analytics.model.util.AnalyticsModelIOUtils; +import org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.SimpleTrigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import static com.google.common.collect.Lists.newArrayList; +import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT; + +/** + * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get + * pre configured Json Object Mapper understand serialization and deserialization of CEF Message + * and TCA Policy + * + * @author Rajiv Singla . Creation Date: 10/24/2016. + */ +public abstract class TCAUtils extends AnalyticsModelJsonUtils { + + private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class); + + /** + * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR, + * WARNING ) + */ + private static final Comparator THRESHOLD_COMPARATOR = new Comparator() { + @Override + public int compare(Threshold threshold1, Threshold threshold2) { + return threshold1.getSeverity().compareTo(threshold2.getSeverity()); + } + }; + + /** + * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy} + * + * @return TCA Policy Metrics Per Event Name list + */ + public static Function> tcaPolicyMetricsExtractorFunction() { + return new Function>() { + @Nullable + @Override + public List apply(@Nonnull TCAPolicy tcaPolicy) { + return tcaPolicy.getMetricsPerEventName(); + } + }; + } + + /** + * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from + * {@link MetricsPerEventName} + * + * @return Event Names or a Metrics Per Event Name object + */ + public static Function tcaEventNameExtractorFunction() { + return new Function() { + @Override + public String apply(@Nonnull MetricsPerEventName metricsPerEventName) { + return metricsPerEventName.getEventName(); + } + }; + } + + + /** + * Extracts {@link TCAPolicy} Event Names + * + * @param tcaPolicy TCA Policy + * @return List of event names in the TCA Policy + */ + public static List getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) { + final List metricsPerEventNames = + tcaPolicyMetricsExtractorFunction().apply(tcaPolicy); + + return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction()); + } + + /** + * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to + * change during runtime + * + * @param tcaPolicy TCA Policy + * @return a Supplier that memoize the TCA Policy event names + */ + public static Supplier> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) { + return Suppliers.memoize(new Supplier>() { + @Override + public List get() { + return getPolicyEventNames(tcaPolicy); + } + }); + } + + + /** + * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path + * + * @param tcaPolicy TCA Policy + * @return A table with Keys of event name and field path containing List of threshold as values + */ + public static Table> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) { + final Table> domainFRTable = HashBasedTable.create(); + for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) { + final String eventName = metricsPerEventName.getEventName(); + final List thresholds = metricsPerEventName.getThresholds(); + for (Threshold threshold : thresholds) { + final List existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath()); + if (existingThresholds == null) { + final LinkedList newThresholdList = new LinkedList<>(); + newThresholdList.add(threshold); + domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList); + } else { + domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold); + } + } + } + return domainFRTable; + } + + + /** + * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table + * + * @param tcaPolicy TCA Policy + * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values + */ + public static Supplier>> getPolicyEventNameThresholdsTableSupplier + (final TCAPolicy tcaPolicy) { + return Suppliers.memoize(new Supplier>>() { + @Override + public Table> get() { + return getPolicyEventNameThresholdsTable(tcaPolicy); + } + }); + } + + + /** + * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor}, + * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to + * filter out messages which does not match policy domain or event Name + * + * @param cefMessage CEF Message + * @param tcaPolicy TCA Policy + * @return Message Process Context after processing filter chain + */ + public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage, + @Nonnull final TCAPolicy tcaPolicy) { + + final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor(); + final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter(); + final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter(); + // Create a list of message processors + final ImmutableList> messageProcessors = + ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter); + final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy); + // Create a message processors chain + final GenericMessageChainProcessor tcaProcessingChain = + new GenericMessageChainProcessor<>(messageProcessors, processorContext); + // process chain + return tcaProcessingChain.processChain(); + } + + + /** + * Extracts json path values for given json Field Paths from using Json path notation. Assumes + * that values extracted are always long + * + * @param message CEF Message + * @param jsonFieldPaths Json Field Paths + * @return Map containing key as json path and values as values associated with that json path + */ + public static Map> getJsonPathValue(@Nonnull String message, @Nonnull Set + jsonFieldPaths) { + + final Map> jsonFieldPathMap = new HashMap<>(); + final DocumentContext documentContext = JsonPath.parse(message); + + for (String jsonFieldPath : jsonFieldPaths) { + List jsonFieldValues = null; + + try { + jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef>() { + }); + } catch (Exception e) { + final String errorMessage = String.format( + "Unable to convert jsonFieldPath: %s value to valid number. " + + "Json Path value is not in a valid number format. Incoming message: %s", + jsonFieldPath, message); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + // If Json Field Values are not or empty + if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) { + // Filter out all null values in the filed values list + final List nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues, + Predicates.notNull())); + // If there are non null values put them in the map + if (!nonNullValues.isEmpty()) { + jsonFieldPathMap.put(jsonFieldPath, nonNullValues); + } + } + } + + return jsonFieldPathMap; + } + + /** + * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path + * it applies threshold in order of their severity and record the first threshold per message field path + * + * @param messageFieldValues Field Path Values extracted from CEF Message + * @param fieldThresholds Policy Thresholds for Field Path + * @return Optional of violated threshold for a field path + */ + public static Optional thresholdCalculator(final List messageFieldValues, final + List + fieldThresholds) { + // order thresholds by severity + Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR); + // Now apply each threshold to field values + for (Threshold fieldThreshold : fieldThresholds) { + for (BigDecimal messageFieldValue : messageFieldValues) { + final Boolean isThresholdViolated = + fieldThreshold.getDirection().operate(messageFieldValue, new BigDecimal(fieldThreshold + .getThresholdValue())); + if (isThresholdViolated) { + final Threshold violatedThreshold = Threshold.copy(fieldThreshold); + violatedThreshold.setActualFieldValue(messageFieldValue); + return Optional.of(violatedThreshold); + } + } + } + return Optional.absent(); + } + + /** + * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message. + * Grabs first highest priority violated threshold + * + * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds + * @return First Highest priority violated threshold + */ + public static Threshold prioritizeThresholdViolations(final Map violatedThresholdsMap) { + + final List violatedThresholds = newArrayList(violatedThresholdsMap.values()); + + if (violatedThresholds.size() == 1) { + return violatedThresholds.get(0); + } + Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR); + // Just grab the first violated threshold with highest priority + return violatedThresholds.get(0); + } + + + /** + * Creates {@link MetricsPerEventName} object which contains violated thresholds + * + * @param tcaPolicy TCA Policy + * @param violatedThreshold Violated thresholds + * @param eventName Event Name + * + * @return MetricsPerEventName object containing one highest severity violated threshold + */ + public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy, + @Nonnull final Threshold violatedThreshold, + @Nonnull final String eventName) { + + final ArrayList metricsPerEventNames = newArrayList( + Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate() { + @Override + public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) { + return metricsPerEventName.getEventName().equals(eventName); + } + })); + // TCA policy must have only one metrics per event Name + if (metricsPerEventNames.size() == 1) { + final MetricsPerEventName violatedMetrics = + MetricsPerEventName.copy(metricsPerEventNames.get(0)); + violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold))); + return violatedMetrics; + } else { + final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName); + throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage)); + } + } + + /** + * Computes threshold violations + * + * @param processorContext Filtered processor Context + * @return processor context with any threshold violations + */ + public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) { + final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor(); + return policyThresholdsProcessor.apply(processorContext); + } + + + /** + * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse} + * formats + * + * @param processorContextWithViolations processor context which has TCA violations + * @param tcaAppName tca app name + * @param isAlertInCEFFormat determines if output alert is in CEF format + * + * @return TCA Alert String + * + * @throws JsonProcessingException If alert cannot be parsed into JSON String + */ + public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations, + final String tcaAppName, + final Boolean isAlertInCEFFormat) throws JsonProcessingException { + if (isAlertInCEFFormat != null && isAlertInCEFFormat) { + final EventListener eventListenerWithViolations = + addThresholdViolationFields(processorContextWithViolations); + final String alertString = writeValueAsString(eventListenerWithViolations); + LOG.debug("Created alert in CEF Format: {}", alertString); + return alertString; + } else { + final TCAVESResponse newTCAVESResponse = + createNewTCAVESResponse(processorContextWithViolations, tcaAppName); + final String alertString = writeValueAsString(newTCAVESResponse); + LOG.debug("Created alert in Non CEF Format: {}", alertString); + return alertString; + } + } + + /** + * Adds threshold violation fields to {@link EventListener} + * + * @param processorContextWithViolations processor context that contains violations + * @return event listener with threshold crossing alert fields populated + */ + public static EventListener addThresholdViolationFields( + final TCACEFProcessorContext processorContextWithViolations) { + + final MetricsPerEventName metricsPerEventName = + processorContextWithViolations.getMetricsPerEventName(); + // confirm violations are indeed present + if (metricsPerEventName == null) { + final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields"; + throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + + // get violated threshold + final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0); + final EventListener eventListener = processorContextWithViolations.getCEFEventListener(); + final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader(); + + // create new threshold crossing alert fields + final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields(); + thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString()); + thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity()); + thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date())); + thresholdCrossingAlertFields.setAlertAction(AlertAction.SET); + thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY); + thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString()); + thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName()); + thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName()); + + // create new performance count + final PerformanceCounter performanceCounter = new PerformanceCounter(); + performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity())); + performanceCounter.setName(violatedThreshold.getFieldPath()); + performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString()); + performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString()); + + // set additional parameters for threshold crossing alert fields + thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter)); + + // add threshold crossing fields to existing event listener + eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields); + + return eventListener; + } + + /** + * Converts {@link EventSeverity} to {@link Criticality} + * + * @param eventSeverity event severity + * + * @return performance counter criticality + */ + private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) { + switch (eventSeverity) { + case CRITICAL: + return Criticality.CRIT; + case MAJOR: + return Criticality.MAJ; + default: + return Criticality.UNKNOWN; + } + } + + /** + * Creates {@link TCAVESResponse} object + * + * @param processorContext processor Context with violations + * @param tcaAppName TCA App Name + * + * @return TCA VES Response Message + */ + public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext, + final String tcaAppName) { + + final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName(); + // confirm violations are indeed present + if (metricsPerEventName == null) { + final String errorMessage = "No violations metrics. Unable to create VES Response"; + throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); + } + + final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0); + final EventListener eventListener = processorContext.getCEFEventListener(); + final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader(); + + final TCAVESResponse tcavesResponse = new TCAVESResponse(); + // ClosedLoopControlName included in the DCAE configuration Policy + tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName()); + // version included in the DCAE configuration Policy + tcavesResponse.setVersion(violatedThreshold.getVersion()); + // Generate a UUID for this output message + tcavesResponse.setRequestID(UUID.randomUUID().toString()); + // commonEventHeader.startEpochMicrosec from the received VES message + tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec()); + // commonEventHeader.lastEpochMicrosec from the received VES message for abated alerts + if (violatedThreshold.getClosedLoopEventStatus() == ClosedLoopEventStatus.ABATED) { + tcavesResponse.setClosedLoopAlarmEnd(commonEventHeader.getLastEpochMicrosec()); + } + // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot + tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName); + + final AAI aai = new AAI(); + tcavesResponse.setAai(aai); + + // VM specific settings + if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) { + // Hard Coded - "VM" + tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE); + // Hard Coded - "vserver.vserver-name" + tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET); + // commonEventHeader.sourceName from the received VES message + aai.setGenericServerName(commonEventHeader.getSourceName()); + } else { + // VNF specific settings + // Hard Coded - "VNF" + tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE); + // Hard Coded - "generic-vnf.vnf-name" + tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET); + // commonEventHeader.sourceName from the received VES message + aai.setGenericVNFName(commonEventHeader.getSourceName()); + } + + // Hard Coded - "DCAE" + tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM); + // policyScope included in the DCAE configuration Policy + tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope()); + // policyName included in the DCAE configuration Policy + tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName()); + // policyVersion included in the DCAE configuration Policy + tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion()); + // Extracted from violated threshold + tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name()); + + return tcavesResponse; + } + + + /** + * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert + * + * @param tcavesResponse alert + * + * @return control Loop Schema Type + */ + public static ControlLoopSchemaType determineControlLoopSchemaType(final TCAVESResponse tcavesResponse) { + final AAI aai = tcavesResponse.getAai(); + if (aai.getGenericServerName() != null) { + return ControlLoopSchemaType.VM; + } else { + return ControlLoopSchemaType.VNF; + } + } + + /** + * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert + * + * @param tcavesResponse {@link TCAVESResponse} TCA alert + * + * @return Source name + */ + public static String determineSourceName(final TCAVESResponse tcavesResponse) { + final AAI aai = tcavesResponse.getAai(); + if (aai.getGenericServerName() != null) { + return aai.getGenericServerName(); + } else { + return aai.getGenericVNFName(); + } + } + + + /** + * Extract Domain and Event Name from processor context if present + * + * @param processorContext processor context + * @return Tuple of domain and event Name + */ + public static Pair getDomainAndEventName( + @Nullable final TCACEFProcessorContext processorContext) { + + String domain = null; + String eventName = null; + + if (processorContext != null && + processorContext.getCEFEventListener() != null && + processorContext.getCEFEventListener().getEvent() != null && + processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) { + final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent() + .getCommonEventHeader(); + + if (commonEventHeader.getDomain() != null) { + domain = commonEventHeader.getDomain().name(); + } + + if (commonEventHeader.getEventName() != null) { + eventName = commonEventHeader.getEventName(); + } + + } + + return new ImmutablePair<>(domain, eventName); + + } + + /** + * Creates {@link TCAPolicy} Metrics per Event Name list + * + * @param eventNamesMap Map containing event Name as key and corresponding values + * + * @return List of {@link MetricsPerEventName} + */ + public static List createTCAPolicyMetricsPerEventNameList( + final Map> eventNamesMap) { + + // create a new metrics per event Name list + final List metricsPerEventNames = new LinkedList<>(); + + for (Map.Entry> eventNamesEntry : eventNamesMap.entrySet()) { + + // create new metrics per event Name instance + final MetricsPerEventName newMetricsPerEventName = + createNewMetricsPerEventName(eventNamesEntry); + metricsPerEventNames.add(newMetricsPerEventName); + + // determine all threshold related values + final Map thresholdsValuesMaps = + filterMapByKeyNamePrefix(eventNamesEntry.getValue(), + AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX); + + // create a map of all threshold values + final Map> thresholdsMap = + extractSubTree(thresholdsValuesMaps, 1, 2, + AnalyticsConstants.TCA_POLICY_DELIMITER); + + // add thresholds to nmetrics per event Names threshold list + for (Map thresholdMap : thresholdsMap.values()) { + newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap)); + } + + } + + return metricsPerEventNames; + } + + /** + * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap + * + * @param thresholdMap threshold map with threshold values + * + * @return new instance of TCA Policy Threshold + */ + public static Threshold createNewThreshold(final Map thresholdMap) { + final Threshold threshold = new Threshold(); + threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName")); + threshold.setVersion(thresholdMap.get("policy.version")); + threshold.setFieldPath(thresholdMap.get("policy.fieldPath")); + threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction"))); + threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity"))); + threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue"))); + threshold.setClosedLoopEventStatus( + ClosedLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus"))); + return threshold; + } + + /** + * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope + * extracted from given eventNamesEntry + * + * @param eventNamesEntry Event Names Entry + * + * @return new instance of MetricsPerEventName + */ + public static MetricsPerEventName createNewMetricsPerEventName( + final Map.Entry> eventNamesEntry) { + // determine event Name + final String eventName = eventNamesEntry.getKey(); + // determine event Name thresholds + final Map metricsPerEventNameThresholdsMap = eventNamesEntry.getValue(); + final MetricsPerEventName metricsPerEventName = new MetricsPerEventName(); + final List thresholds = new LinkedList<>(); + metricsPerEventName.setThresholds(thresholds); + metricsPerEventName.setEventName(eventName); + // bind policyName, policyVersion, policyScope and closedLoopControlName + metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName")); + metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion")); + metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope")); + metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf( + metricsPerEventNameThresholdsMap.get("controlLoopSchemaType"))); + return metricsPerEventName; + } + + /** + * Converts a flattened key/value map which has keys delimited by a given delimiter. + * The start Index and end index extract the sub-key value and returns a new map containing + * sub-keys and values. + * + * @param actualMap actual Map + * @param startIndex start index + * @param endIndex end index + * @param delimiter delimiter + * + * @return Map with new sub tree map + */ + public static Map> extractSubTree( + final Map actualMap, int startIndex, int endIndex, String delimiter) { + + final SortedMap> subTreeMap = new TreeMap<>(); + + // iterate over actual map entries + for (Map.Entry actualMapEntry : actualMap.entrySet()) { + final String actualMapKey = actualMapEntry.getKey(); + final String actualMapValue = actualMapEntry.getValue(); + + // determine delimiter start and end index + final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex); + final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex); + final int keyLength = actualMapKey.length(); + + // extract sub-tree map + if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) { + final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex); + final Map existingThresholdMap = subTreeMap.get(thresholdKey); + final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength); + if (existingThresholdMap == null) { + Map newThresholdMap = new LinkedHashMap<>(); + newThresholdMap.put(subMapKey, actualMapValue); + subTreeMap.put(thresholdKey, newThresholdMap); + } else { + existingThresholdMap.put(subMapKey, actualMapValue); + } + + } + } + + return subTreeMap; + + } + + + /** + * Provides a view of underlying map that filters out entries with keys starting with give prefix + * + * @param actualMap Target map that needs to be filtered + * @param keyNamePrefix key prefix + * + * @return a view of actual map which only show entries which have give prefix + */ + public static Map filterMapByKeyNamePrefix(final Map actualMap, + final String keyNamePrefix) { + return Maps.filterKeys(actualMap, + new Predicate() { + @Override + public boolean apply(@Nullable String key) { + return key != null && key.startsWith(keyNamePrefix); + } + }); + } + + + /** + * Creates Quartz Scheduler + * + * @param pollingIntervalMS polling interval + * @param stdSchedulerFactory Quartz standard schedule factory instance + * @param quartzPublisherPropertiesFileName quartz properties file name + * @param jobDataMap job Data map + * @param quartzJobClass Quartz Job Class + * @param quartzJobName Quartz Job Name + * @param quartzTriggerName Quartz Trigger name + * + * @param An implementation of Quartz {@link Job} interface + * @return Configured Quartz Scheduler + * + * @throws SchedulerException exception if unable to create to Quartz Scheduler + */ + public static Scheduler createQuartzScheduler(final Integer pollingIntervalMS, + final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName, + final JobDataMap jobDataMap, final Class quartzJobClass, final String quartzJobName, + final String quartzTriggerName) throws SchedulerException { + + // Initialize a new Quartz Standard scheduler + LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}", + quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName); + final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile( + quartzPublisherPropertiesFileName, new Properties()); + stdSchedulerFactory.initialize(quartzProperties); + final Scheduler scheduler = stdSchedulerFactory.getScheduler(); + + // Create a new job detail + final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName, + AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build(); + + // Create a new scheduling builder + final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule() + .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule + .repeatForever(); // repeats while worker is running + + // Create a trigger for the TCA Publisher Job + final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger() + .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME) + .startNow() // job starts right away + .withSchedule(simpleScheduleBuilder).build(); + + scheduler.scheduleJob(jobDetail, simpleTrigger); + LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName()); + return scheduler; + } + + + /** + * Does A&AI Enrichment for VM + * + * @param tcavesResponse Outgoing alert object + * @param aaiEnrichmentClient A&AI Enrichment client + * @param aaiVMEnrichmentAPIPath A&AI VM Enrichment API Path + * @param alertString alert String + * @param vmSourceName vm source name + */ + public static void doAAIVMEnrichment(final TCAVESResponse tcavesResponse, + final AAIEnrichmentClient aaiEnrichmentClient, + final String aaiVMEnrichmentAPIPath, + final String alertString, + final String vmSourceName) { + + final String filterString = "vserver-name:EQUALS:" + vmSourceName; + final ImmutableMap queryParams = ImmutableMap.of( + "search-node-type", "vserver", "filter", filterString); + + // fetch vm object resource Link from A&AI + final String vmAAIResourceLinkDetails = aaiEnrichmentClient.getEnrichmentDetails( + aaiVMEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders()); + final String vmObjectResourceLink = getVMObjectResourceLink(vmAAIResourceLinkDetails); + + if (vmObjectResourceLink == null) { + LOG.warn("No A&AI Enrichment possible for alert message: {}.VM Object resource Link cannot be " + + "determined for vmSourceName: {}.", alertString, vmSourceName); + } else { + + LOG.debug("Fetching VM A&AI Enrichment Details for VM Source Name: {}, Object resource Link: {}", + vmSourceName, vmObjectResourceLink); + + // fetch vm A&AI Enrichment + final String vmEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails( + vmObjectResourceLink, Collections.emptyMap(), createAAIEnrichmentHeaders()); + + // enrich AAI + enrichAAI(tcavesResponse.getAai(), vmEnrichmentDetails, alertString, + AnalyticsConstants.AAI_VSERVER_KEY_PREFIX); + } + + + } + + + /** + * Does A&AI Enrichment for VNF + * + * @param tcavesResponse Outgoing alert object + * @param aaiEnrichmentClient A&AI Enrichment client + * @param aaiVNFEnrichmentAPIPath A&AI VNF Enrichment API Path + * @param alertString alert String + * @param vnfSourceName vnf source name + */ + public static void doAAIVNFEnrichment(final TCAVESResponse tcavesResponse, + final AAIEnrichmentClient aaiEnrichmentClient, + final String aaiVNFEnrichmentAPIPath, + final String alertString, + final String vnfSourceName) { + final ImmutableMap queryParams = ImmutableMap.of("vnf-name", vnfSourceName); + + // fetch vnf A&AI Enrichment + final String vnfEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails( + aaiVNFEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders()); + + // enrich alert AAI + enrichAAI(tcavesResponse.getAai(), vnfEnrichmentDetails, alertString, AnalyticsConstants.AAI_VNF_KEY_PREFIX); + } + + /** + * Fetches VM Object Resource Link from A&AI Resource Link Json + * + * @param vmAAIResourceLinkDetails VM Object Resource Link from A&AI Resource Link Json + * + * @return object resource link String + */ + private static String getVMObjectResourceLink(final String vmAAIResourceLinkDetails) { + if (StringUtils.isNotBlank(vmAAIResourceLinkDetails)) { + try { + final JsonNode jsonNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(vmAAIResourceLinkDetails); + final JsonNode resourceLinkJsonNode = jsonNode.findPath("resource-link"); + if (!resourceLinkJsonNode.isMissingNode()) { + return resourceLinkJsonNode.asText(); + } + } catch (IOException e) { + LOG.warn("Unable to determine VM Object link inside AAI Resource Link Response JSON: {}. Exception: {}", + vmAAIResourceLinkDetails, e); + } + } + return null; + } + + /** + * Creates Http Headers for A&AI Enrichment client + * + * @return Http Headers Map for A&AI Enrichment client + */ + private static Map createAAIEnrichmentHeaders() { + final Map aaiEnrichmentHeaders = new LinkedHashMap<>(); + final String transactionId = Long.toString(new Date().getTime()); + aaiEnrichmentHeaders.put("X-FromAppId", "dcae-analytics-tca"); + aaiEnrichmentHeaders.put("X-TransactionId", transactionId); + aaiEnrichmentHeaders.put("Accept", "application/json"); + aaiEnrichmentHeaders.put("Real-Time", "true"); + aaiEnrichmentHeaders.put("Content-Type", "application/json"); + return aaiEnrichmentHeaders; + } + + + /** + * Populates A&AI details retrieved from A&AI Enrichment API into Alerts A&AI Object + * + * @param preEnrichmentAAI A&AI Alert object which needs to be populated with A&AI Enrichment Details + * @param aaiEnrichmentDetails A&AI Enrichment API fetched JSON String + * @param alertString Alert String + * @param keyPrefix Key prefix that needs to be added to each fetched A&AI Enrichment record + */ + private static void enrichAAI(final AAI preEnrichmentAAI, final String aaiEnrichmentDetails, + final String alertString, final String keyPrefix) { + + if (aaiEnrichmentDetails == null) { + LOG.warn("No A&AI Enrichment possible for AAI: {}. A&AI Enrichment details are absent." + + "Skipping Enrichment for alert message:{}", preEnrichmentAAI, alertString); + + } else { + + final AAI enrichmentDetailsAAI = getEnrichmentDetailsAAI(aaiEnrichmentDetails); + + if (enrichmentDetailsAAI != null) { + final Set> enrichedAAIEntrySet = + enrichmentDetailsAAI.getDynamicProperties().entrySet(); + final Map preEnrichmentAAIDynamicProperties = preEnrichmentAAI.getDynamicProperties(); + + // populate A&AI Enrichment details and add prefix to key + for (Map.Entry enrichedAAIEntry : enrichedAAIEntrySet) { + preEnrichmentAAIDynamicProperties.put(keyPrefix + enrichedAAIEntry.getKey(), + enrichedAAIEntry.getValue()); + } + + LOG.debug("A&AI Enrichment was completed successfully for alert message: {}. Enriched AAI: {}", + alertString, preEnrichmentAAI); + } else { + LOG.warn("No A&AI Enrichment possible for AAI: {}. Invalid A&AI Response: {}." + + "Skipping Enrichment for alert message: {}", + preEnrichmentAAI, aaiEnrichmentDetails, alertString); + } + } + + } + + /** + * Creates a new A&AI object with only top level A&AI Enrichment details + * + * @param aaiEnrichmentDetails A&AI Enrichment details + * + * @return new A&AI with only top level A&AI Enrichment details + */ + private static AAI getEnrichmentDetailsAAI(final String aaiEnrichmentDetails) { + try { + final JsonNode rootNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(aaiEnrichmentDetails); + final Iterator> fieldsIterator = rootNode.fields(); + while (fieldsIterator.hasNext()) { + final Map.Entry fieldEntry = fieldsIterator.next(); + final JsonNode jsonNode = fieldEntry.getValue(); + // remove all arrays, objects from A&AI Enrichment Json + if (jsonNode.isPojo() || jsonNode.isObject() || jsonNode.isArray()) { + fieldsIterator.remove(); + } + } + return ANALYTICS_MODEL_OBJECT_MAPPER.treeToValue(rootNode, AAI.class); + } catch (IOException e) { + LOG.error("Failed to Parse AAI Enrichment Details from JSON: {}, Exception: {}.", aaiEnrichmentDetails, e); + } + return null; + } + +} diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java deleted file mode 100644 index 1540896..0000000 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; - -/** - *

- * Encapsulates common functionality for all TCA CEF Policy Processors - *

- * - * @author Rajiv Singla . Creation Date: 11/9/2016. - */ -public abstract class AbstractTCAECEFPolicyProcessor extends AbstractMessageProcessor { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractTCAECEFPolicyProcessor.class); - - /** - * For all TCA Policy Processor the pre processor ensures that {@link EventListener} object is - * present - * - * @param processorContext incoming Processor Context - * @return Pre processed Processor Context - */ - @Override - public TCACEFProcessorContext preProcessor(@Nonnull TCACEFProcessorContext processorContext) { - // validates CEF Event Listener is Present - final EventListener cefEventListener = processorContext.getCEFEventListener(); - if (cefEventListener == null) { - final String errorMessage = String.format( - "CEF Event Listener is not Present.Invalid use of Processor: %s. CEF Message: %s", - getProcessorInfo().getProcessorName(), processorContext.getMessage()); - throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - return super.preProcessor(processorContext); - } -} diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java deleted file mode 100644 index 943270e..0000000 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.apache.commons.lang3.StringUtils; -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - *

- * Processor that converts incoming presumed JSON string CEF message to {@link EventListener} object - *
- * Pre Conditions: None - *

- * - * @author Rajiv Singla . Creation Date: 11/5/2016. - */ -public class TCACEFJsonProcessor extends AbstractMessageProcessor { - - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(TCACEFJsonProcessor.class); - - - @Override - public String getProcessorDescription() { - return "Converts incoming TCA CEF Message to Event Listener object"; - } - - @Override - public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { - - final String cefMessage = processorContext.getMessage(); - - // If CEF Message is null then processor should stop processing - if (cefMessage == null) { - String errorMessage = "Null CEF message cannot be converted to CEF Event Listener Object"; - throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - - // If CEF Message is blank then processor stop processing - if (StringUtils.isBlank(cefMessage)) { - setTerminatingProcessingMessage("Blank CEF message cannot be converted to CEF Event Listener Object", - processorContext); - return processorContext; - } - - // trim cef message - final String trimmedCEFMessage = cefMessage.trim(); - - // if message does not start with curly brace and ends with curly brace, it is not a valid cef message - // processor will stop processing - if (!(trimmedCEFMessage.startsWith("{") && trimmedCEFMessage.endsWith("}"))) { - setTerminatingProcessingMessage("CEF Message must start with curly brace and must end with curly brace", - processorContext); - return processorContext; - } - - // try parsing the cef message - try { - final EventListener eventListener = TCAUtils.readValue(trimmedCEFMessage, EventListener.class); - setFinishedProcessingMessage("CEF JSON to Event Listener Conversion Successful", processorContext); - // set new Event Listener in the Processor Context - processorContext.setCEFEventListener(eventListener); - return processorContext; - } catch (IOException e) { - final String errorMessage = String.format("Parsing Failed for CEF Message: %s, Error: %s", cefMessage, e); - // If parsing fails throw an exception - throw new MessageProcessingException(errorMessage, LOG, e); - } - - } -} diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java deleted file mode 100644 index 3819d2c..0000000 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.openecomp.dcae.apod.analytics.model.domain.cef.Domain; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; - -/** - *

- * TCA Processor which acts like a filter to filter out messages which does not belong to TCA Policy Domain - *
- * Pre Conditions: CEF Event Listener must be present - *

- * - * @author Rajiv Singla . Creation Date: 11/7/2016. - */ -public class TCACEFPolicyDomainFilter extends AbstractTCAECEFPolicyProcessor { - - - private static final long serialVersionUID = 1L; - - @Override - public String getProcessorDescription() { - return "Filters out CEF Messages which does not match TCAPolicy Domain"; - } - - @Override - public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { - - // Safe to get event Listener here without null check as pre processor will validate if - // event listener is indeed present - final EventListener eventListener = processorContext.getCEFEventListener(); - - Domain cefMessageDomain; - - // Extract CEF domain as it is must be present as per CEF Schema - if (eventListener.getEvent() != null && - eventListener.getEvent().getCommonEventHeader() != null && - eventListener.getEvent().getCommonEventHeader().getDomain() != null) { - cefMessageDomain = eventListener.getEvent().getCommonEventHeader().getDomain(); - - } else { - final String terminatingMessage = "Invalid CEF Message.Common Event Header Domain not present."; - setTerminatingProcessingMessage(terminatingMessage, processorContext); - return processorContext; - } - - // Get Policy Domain. TCA Policy Validation must ensure that Domain is indeed present - // no null check will be required here - final String policyDomain = processorContext.getTCAPolicy().getDomain(); - - // If Policy domain matches CEF message domain then continue processing - if (cefMessageDomain.toString().equalsIgnoreCase(policyDomain)) { - final String finishMessage = String.format("Policy Domain and CEF Message Domain match successful." + - " Message Domain: %s, Policy Domain: %s", cefMessageDomain, policyDomain); - setFinishedProcessingMessage(finishMessage, processorContext); - } else { - // If policy domain does not match with CEF message terminate processing chain - final String terminatingMessage = String.format("Policy Domain and CEF Message Domain match unsuccessful." + - " Message Domain: %s, Policy Domain: %s", cefMessageDomain, policyDomain); - setTerminatingProcessingMessage(terminatingMessage, processorContext); - } - - return processorContext; - } -} diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java deleted file mode 100644 index ddf3888..0000000 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import com.google.common.base.Joiner; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; - -import java.util.List; - -import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.getPolicyEventNamesSupplier; - -/** - *

- * TCA Processor that acts like a filter to filter out messages which does not belong to TCA Policy Event Name - *
- * Pre Conditions: CEF Event Listener must be present - *

- * - * @author Rajiv Singla . Creation Date: 11/9/2016. - */ -public class TCACEFPolicyEventNameFilter extends AbstractTCAECEFPolicyProcessor { - - private static final long serialVersionUID = 1L; - - @Override - public String getProcessorDescription() { - return "Filters out CEF Messages which does not match Policy Functional Roles"; - } - - @Override - public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { - - // Safe to get event Listener here without null check as pre processor will validate if - // event listener is indeed present - final EventListener eventListener = processorContext.getCEFEventListener(); - - String cefMessageEventName; - - if (eventListener.getEvent() != null && - eventListener.getEvent().getCommonEventHeader() != null && - eventListener.getEvent().getCommonEventHeader().getEventName() != null) { - cefMessageEventName = eventListener.getEvent().getCommonEventHeader().getEventName(); - } else { - String terminationMessage = "Invalid CEF Message.Common Event Header Event Name not present."; - setTerminatingProcessingMessage(terminationMessage, processorContext); - return processorContext; - } - - // Determine Policy Functional Roles - final TCAPolicy tcaPolicy = processorContext.getTCAPolicy(); - final List policyEventNames = getPolicyEventNamesSupplier(tcaPolicy).get(); - final String policyEventNamesString = Joiner.on(",").join(policyEventNames); - - // If Policy event names contains CEF message event names then continue processing - if (policyEventNames.contains(cefMessageEventName)) { - final String finishMessage = String.format( - "Policy Event Name and CEF Message Event Name match successful." + - "Message EventName: %s, Policy Event Names: %s", - cefMessageEventName, policyEventNamesString); - setFinishedProcessingMessage(finishMessage, processorContext); - } else { - // If Policy event names does not contain CEF message event names then terminate processing - final String terminatingMessage = String.format( - "Policy Event name and CEF Message Event name match unsuccessful." + - "Message EventName: %s, Policy Event Names: %s", - cefMessageEventName, policyEventNamesString); - setTerminatingProcessingMessage(terminatingMessage, processorContext); - } - - return processorContext; - } -} diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java deleted file mode 100644 index 0a62aa4..0000000 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import com.google.common.base.Optional; -import com.google.common.collect.Table; -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.model.domain.cef.Domain; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.math.BigDecimal; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nonnull; - -/** - *

- * TCA CEF Policy Threshold processor - *
- * Pre Conditions: Domain and Functional Role must be present in CEF Event Listener Object - *

- * - * @author Rajiv Singla . Creation Date: 11/9/2016. - */ -public class TCACEFPolicyThresholdsProcessor extends AbstractTCAECEFPolicyProcessor { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(TCACEFPolicyThresholdsProcessor.class); - - @Override - public TCACEFProcessorContext preProcessor(@Nonnull TCACEFProcessorContext processorContext) { - // validates Domain and Functional Role are present - final EventListener eventListener = processorContext.getCEFEventListener(); - final Domain domain = eventListener.getEvent().getCommonEventHeader().getDomain(); - final String eventName = eventListener.getEvent().getCommonEventHeader().getEventName(); - if (domain == null || eventName == null) { - final String errorMessage = "CEF Event Listener domain or eventName not Present. " + - "Invalid use of this Processor"; - throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - return super.preProcessor(processorContext); - } - - @Override - public String getProcessorDescription() { - return "Applies TCA Policy rules to incoming CEF message. If any thresholds are violated attaches max " + - "Severity violated threshold to TCA Processor Context"; - } - - @Override - public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { - - final String cefMessage = processorContext.getMessage(); - - // Determine domain and eventName - final EventListener eventListener = processorContext.getCEFEventListener(); - final String eventName = eventListener.getEvent().getCommonEventHeader().getEventName(); - - // Get Table containing event Name and Thresholds Field Path - final TCAPolicy tcaPolicy = processorContext.getTCAPolicy(); - final Table> eventNameFieldPathsTable = - TCAUtils.getPolicyEventNameThresholdsTableSupplier(tcaPolicy).get(); - - // Get Policy Field Paths for that event Name - final Map> policyFieldPathsMap = eventNameFieldPathsTable.row(eventName); - final Set policyFieldPaths = policyFieldPathsMap.keySet(); - - // Get Json Values for Policy Fields - final Map> messageFieldValuesMap = - TCAUtils.getJsonPathValue(cefMessage, policyFieldPaths); - - // Determine all violated thresholds per message field Path - final Map violatedThresholdsMap = new HashMap<>(); - for (Map.Entry> messageFieldValuesMapEntry : messageFieldValuesMap.entrySet()) { - final String messageFieldPath = messageFieldValuesMapEntry.getKey(); - final List messageFieldAssociatedPolicyThresholds = policyFieldPathsMap.get(messageFieldPath); - if (messageFieldAssociatedPolicyThresholds != null) { - final Optional thresholdOptional = TCAUtils.thresholdCalculator( - messageFieldValuesMapEntry.getValue(), messageFieldAssociatedPolicyThresholds); - if (thresholdOptional.isPresent()) { - violatedThresholdsMap.put(messageFieldPath, thresholdOptional.get()); - } - } - } - - // No threshold were violated - if (violatedThresholdsMap.isEmpty()) { - - final String terminationMessage = "No Policy Threshold violated by the VES CEF Message."; - setTerminatingProcessingMessage(terminationMessage, processorContext); - - } else { - - // If there are policy violations then determine max priority violation - final Threshold maxSeverityThresholdViolation = - TCAUtils.prioritizeThresholdViolations(violatedThresholdsMap); - final MetricsPerEventName violatedMetrics = TCAUtils.createViolatedMetrics(tcaPolicy, - maxSeverityThresholdViolation, eventName); - // attach policy violation to processor Context - processorContext.setMetricsPerEventName(violatedMetrics); - - final String finishMessage = String.format("Policy Threshold violation detected for threshold: %s", - maxSeverityThresholdViolation); - setFinishedProcessingMessage(finishMessage, processorContext); - - } - - return processorContext; - } -} diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java deleted file mode 100644 index 80dfca0..0000000 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractProcessorContext; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; - -/** - * TCA CEF Policy Processor Context - * - * @author Rajiv Singla . Creation Date: 11/7/2016. - */ -public class TCACEFProcessorContext extends AbstractProcessorContext { - - private static final long serialVersionUID = 1L; - - private final TCAPolicy tcaPolicy; - private EventListener eventListener; - private MetricsPerEventName metricsPerEventName; - - public TCACEFProcessorContext(final String message, boolean canProcessingContinue, final TCAPolicy tcaPolicy) { - super(message, canProcessingContinue); - this.tcaPolicy = tcaPolicy; - // present only if cef incoming message can be parsed successfully to Event Listener Object - this.eventListener = null; - // present only if there are any threshold violations are detected - this.metricsPerEventName = null; - } - - // Auxiliary Constructor which default canProcessingContinue Flag to true - public TCACEFProcessorContext(final String message, final TCAPolicy tcaPolicy) { - this(message, true, tcaPolicy); - } - - /** - * Returns {@link TCAPolicy} Object - * - * @return TCA Policy - */ - public TCAPolicy getTCAPolicy() { - return tcaPolicy; - } - - /** - * Returns Common Event Format {@link EventListener} if present else null - * - * @return CEF Event Listener - */ - public EventListener getCEFEventListener() { - return eventListener; - } - - - /** - * Sets new {@link EventListener} - * - * @param eventListener set new value for CEF event listener - */ - public void setCEFEventListener(final EventListener eventListener) { - this.eventListener = eventListener; - } - - - /** - * Returns TCA Policy {@link MetricsPerEventName} which was has violated Threshold for the CEF Message if - * present else null - * - * @return Violated Threshold - */ - public MetricsPerEventName getMetricsPerEventName() { - return metricsPerEventName; - } - - /** - * Assign new TCA Policy {@link MetricsPerEventName} which was has violated Threshold for the CEF Message - * - * @param metricsPerEventName new value for Metrics Per Functional Role with violated threshold - */ - public void setMetricsPerEventName(MetricsPerEventName metricsPerEventName) { - this.metricsPerEventName = metricsPerEventName; - } - -} diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtils.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtils.java deleted file mode 100644 index e6e96f1..0000000 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtils.java +++ /dev/null @@ -1,1016 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.utils; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.collect.HashBasedTable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Table; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.TypeRef; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.openecomp.dcae.apod.analytics.aai.service.AAIEnrichmentClient; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor; -import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor; -import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction; -import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType; -import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader; -import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity; -import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter; -import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; -import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI; -import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse; -import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils; -import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; -import org.quartz.Job; -import org.quartz.JobBuilder; -import org.quartz.JobDataMap; -import org.quartz.JobDetail; -import org.quartz.Scheduler; -import org.quartz.SchedulerException; -import org.quartz.SimpleScheduleBuilder; -import org.quartz.SimpleTrigger; -import org.quartz.TriggerBuilder; -import org.quartz.impl.StdSchedulerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.UUID; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import static com.google.common.collect.Lists.newArrayList; -import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT; - -/** - * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get - * pre configured Json Object Mapper understand serialization and deserialization of CEF Message - * and TCA Policy - * - * @author Rajiv Singla . Creation Date: 10/24/2016. - */ -public abstract class TCAUtils extends AnalyticsModelJsonUtils { - - private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class); - - /** - * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR, - * WARNING ) - */ - private static final Comparator THRESHOLD_COMPARATOR = new Comparator() { - @Override - public int compare(Threshold threshold1, Threshold threshold2) { - return threshold1.getSeverity().compareTo(threshold2.getSeverity()); - } - }; - - /** - * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy} - * - * @return TCA Policy Metrics Per Event Name list - */ - public static Function> tcaPolicyMetricsExtractorFunction() { - return new Function>() { - @Nullable - @Override - public List apply(@Nonnull TCAPolicy tcaPolicy) { - return tcaPolicy.getMetricsPerEventName(); - } - }; - } - - /** - * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from - * {@link MetricsPerEventName} - * - * @return Event Names or a Metrics Per Event Name object - */ - public static Function tcaEventNameExtractorFunction() { - return new Function() { - @Override - public String apply(@Nonnull MetricsPerEventName metricsPerEventName) { - return metricsPerEventName.getEventName(); - } - }; - } - - - /** - * Extracts {@link TCAPolicy} Event Names - * - * @param tcaPolicy TCA Policy - * @return List of event names in the TCA Policy - */ - public static List getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) { - final List metricsPerEventNames = - tcaPolicyMetricsExtractorFunction().apply(tcaPolicy); - - return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction()); - } - - /** - * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to - * change during runtime - * - * @param tcaPolicy TCA Policy - * @return a Supplier that memoize the TCA Policy event names - */ - public static Supplier> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) { - return Suppliers.memoize(new Supplier>() { - @Override - public List get() { - return getPolicyEventNames(tcaPolicy); - } - }); - } - - - /** - * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path - * - * @param tcaPolicy TCA Policy - * @return A table with Keys of event name and field path containing List of threshold as values - */ - public static Table> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) { - final Table> domainFRTable = HashBasedTable.create(); - for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) { - final String eventName = metricsPerEventName.getEventName(); - final List thresholds = metricsPerEventName.getThresholds(); - for (Threshold threshold : thresholds) { - final List existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath()); - if (existingThresholds == null) { - final LinkedList newThresholdList = new LinkedList<>(); - newThresholdList.add(threshold); - domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList); - } else { - domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold); - } - } - } - return domainFRTable; - } - - - /** - * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table - * - * @param tcaPolicy TCA Policy - * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values - */ - public static Supplier>> getPolicyEventNameThresholdsTableSupplier - (final TCAPolicy tcaPolicy) { - return Suppliers.memoize(new Supplier>>() { - @Override - public Table> get() { - return getPolicyEventNameThresholdsTable(tcaPolicy); - } - }); - } - - - /** - * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor}, - * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to - * filter out messages which does not match policy domain or event Name - * - * @param cefMessage CEF Message - * @param tcaPolicy TCA Policy - * @return Message Process Context after processing filter chain - */ - public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage, - @Nonnull final TCAPolicy tcaPolicy) { - - final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor(); - final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter(); - final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter(); - // Create a list of message processors - final ImmutableList> messageProcessors = - ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter); - final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy); - // Create a message processors chain - final GenericMessageChainProcessor tcaProcessingChain = - new GenericMessageChainProcessor<>(messageProcessors, processorContext); - // process chain - return tcaProcessingChain.processChain(); - } - - - /** - * Extracts json path values for given json Field Paths from using Json path notation. Assumes - * that values extracted are always long - * - * @param message CEF Message - * @param jsonFieldPaths Json Field Paths - * @return Map containing key as json path and values as values associated with that json path - */ - public static Map> getJsonPathValue(@Nonnull String message, @Nonnull Set - jsonFieldPaths) { - - final Map> jsonFieldPathMap = new HashMap<>(); - final DocumentContext documentContext = JsonPath.parse(message); - - for (String jsonFieldPath : jsonFieldPaths) { - List jsonFieldValues = null; - - try { - jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef>() { - }); - } catch (Exception e) { - final String errorMessage = String.format( - "Unable to convert jsonFieldPath: %s value to valid number. " + - "Json Path value is not in a valid number format. Incoming message: %s", - jsonFieldPath, message); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - // If Json Field Values are not or empty - if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) { - // Filter out all null values in the filed values list - final List nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues, - Predicates.notNull())); - // If there are non null values put them in the map - if (!nonNullValues.isEmpty()) { - jsonFieldPathMap.put(jsonFieldPath, nonNullValues); - } - } - } - - return jsonFieldPathMap; - } - - /** - * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path - * it applies threshold in order of their severity and record the first threshold per message field path - * - * @param messageFieldValues Field Path Values extracted from CEF Message - * @param fieldThresholds Policy Thresholds for Field Path - * @return Optional of violated threshold for a field path - */ - public static Optional thresholdCalculator(final List messageFieldValues, final - List - fieldThresholds) { - // order thresholds by severity - Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR); - // Now apply each threshold to field values - for (Threshold fieldThreshold : fieldThresholds) { - for (BigDecimal messageFieldValue : messageFieldValues) { - final Boolean isThresholdViolated = - fieldThreshold.getDirection().operate(messageFieldValue, new BigDecimal(fieldThreshold - .getThresholdValue())); - if (isThresholdViolated) { - final Threshold violatedThreshold = Threshold.copy(fieldThreshold); - violatedThreshold.setActualFieldValue(messageFieldValue); - return Optional.of(violatedThreshold); - } - } - } - return Optional.absent(); - } - - /** - * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message. - * Grabs first highest priority violated threshold - * - * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds - * @return First Highest priority violated threshold - */ - public static Threshold prioritizeThresholdViolations(final Map violatedThresholdsMap) { - - final List violatedThresholds = newArrayList(violatedThresholdsMap.values()); - - if (violatedThresholds.size() == 1) { - return violatedThresholds.get(0); - } - Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR); - // Just grab the first violated threshold with highest priority - return violatedThresholds.get(0); - } - - - /** - * Creates {@link MetricsPerEventName} object which contains violated thresholds - * - * @param tcaPolicy TCA Policy - * @param violatedThreshold Violated thresholds - * @param eventName Event Name - * - * @return MetricsPerEventName object containing one highest severity violated threshold - */ - public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy, - @Nonnull final Threshold violatedThreshold, - @Nonnull final String eventName) { - - final ArrayList metricsPerEventNames = newArrayList( - Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate() { - @Override - public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) { - return metricsPerEventName.getEventName().equals(eventName); - } - })); - // TCA policy must have only one metrics per event Name - if (metricsPerEventNames.size() == 1) { - final MetricsPerEventName violatedMetrics = - MetricsPerEventName.copy(metricsPerEventNames.get(0)); - violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold))); - return violatedMetrics; - } else { - final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName); - throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage)); - } - } - - /** - * Computes threshold violations - * - * @param processorContext Filtered processor Context - * @return processor context with any threshold violations - */ - public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) { - final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor(); - return policyThresholdsProcessor.apply(processorContext); - } - - - /** - * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse} - * formats - * - * @param processorContextWithViolations processor context which has TCA violations - * @param tcaAppName tca app name - * @param isAlertInCEFFormat determines if output alert is in CEF format - * - * @return TCA Alert String - * - * @throws JsonProcessingException If alert cannot be parsed into JSON String - */ - public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations, - final String tcaAppName, - final Boolean isAlertInCEFFormat) throws JsonProcessingException { - if (isAlertInCEFFormat != null && isAlertInCEFFormat) { - final EventListener eventListenerWithViolations = - addThresholdViolationFields(processorContextWithViolations); - final String alertString = writeValueAsString(eventListenerWithViolations); - LOG.debug("Created alert in CEF Format: {}", alertString); - return alertString; - } else { - final TCAVESResponse newTCAVESResponse = - createNewTCAVESResponse(processorContextWithViolations, tcaAppName); - final String alertString = writeValueAsString(newTCAVESResponse); - LOG.debug("Created alert in Non CEF Format: {}", alertString); - return alertString; - } - } - - /** - * Adds threshold violation fields to {@link EventListener} - * - * @param processorContextWithViolations processor context that contains violations - * @return event listener with threshold crossing alert fields populated - */ - public static EventListener addThresholdViolationFields( - final TCACEFProcessorContext processorContextWithViolations) { - - final MetricsPerEventName metricsPerEventName = - processorContextWithViolations.getMetricsPerEventName(); - // confirm violations are indeed present - if (metricsPerEventName == null) { - final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields"; - throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - - // get violated threshold - final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0); - final EventListener eventListener = processorContextWithViolations.getCEFEventListener(); - final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader(); - - // create new threshold crossing alert fields - final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields(); - thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString()); - thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity()); - thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date())); - thresholdCrossingAlertFields.setAlertAction(AlertAction.SET); - thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY); - thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString()); - thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName()); - thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName()); - - // create new performance count - final PerformanceCounter performanceCounter = new PerformanceCounter(); - performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity())); - performanceCounter.setName(violatedThreshold.getFieldPath()); - performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString()); - performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString()); - - // set additional parameters for threshold crossing alert fields - thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter)); - - // add threshold crossing fields to existing event listener - eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields); - - return eventListener; - } - - /** - * Converts {@link EventSeverity} to {@link Criticality} - * - * @param eventSeverity event severity - * - * @return performance counter criticality - */ - private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) { - switch (eventSeverity) { - case CRITICAL: - return Criticality.CRIT; - case MAJOR: - return Criticality.MAJ; - default: - return Criticality.UNKNOWN; - } - } - - /** - * Creates {@link TCAVESResponse} object - * - * @param processorContext processor Context with violations - * @param tcaAppName TCA App Name - * - * @return TCA VES Response Message - */ - public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext, - final String tcaAppName) { - - final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName(); - // confirm violations are indeed present - if (metricsPerEventName == null) { - final String errorMessage = "No violations metrics. Unable to create VES Response"; - throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - - final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0); - final EventListener eventListener = processorContext.getCEFEventListener(); - final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader(); - - final TCAVESResponse tcavesResponse = new TCAVESResponse(); - // ClosedLoopControlName included in the DCAE configuration Policy - tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName()); - // version included in the DCAE configuration Policy - tcavesResponse.setVersion(violatedThreshold.getVersion()); - // Generate a UUID for this output message - tcavesResponse.setRequestID(UUID.randomUUID().toString()); - // commonEventHeader.startEpochMicrosec from the received VES message - tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec()); - // commonEventHeader.lastEpochMicrosec from the received VES message for abated alerts - if (violatedThreshold.getClosedLoopEventStatus() == ClosedLoopEventStatus.ABATED) { - tcavesResponse.setClosedLoopAlarmEnd(commonEventHeader.getLastEpochMicrosec()); - } - // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot - tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName); - - final AAI aai = new AAI(); - tcavesResponse.setAai(aai); - - // VM specific settings - if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) { - // Hard Coded - "VM" - tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE); - // Hard Coded - "vserver.vserver-name" - tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET); - // commonEventHeader.sourceName from the received VES message - aai.setGenericServerName(commonEventHeader.getSourceName()); - } else { - // VNF specific settings - // Hard Coded - "VNF" - tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE); - // Hard Coded - "generic-vnf.vnf-name" - tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET); - // commonEventHeader.sourceName from the received VES message - aai.setGenericVNFName(commonEventHeader.getSourceName()); - } - - // Hard Coded - "DCAE" - tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM); - // policyScope included in the DCAE configuration Policy - tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope()); - // policyName included in the DCAE configuration Policy - tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName()); - // policyVersion included in the DCAE configuration Policy - tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion()); - // Extracted from violated threshold - tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name()); - - return tcavesResponse; - } - - - /** - * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert - * - * @param tcavesResponse alert - * - * @return control Loop Schema Type - */ - public static ControlLoopSchemaType determineControlLoopSchemaType(final TCAVESResponse tcavesResponse) { - final AAI aai = tcavesResponse.getAai(); - if (aai.getGenericServerName() != null) { - return ControlLoopSchemaType.VM; - } else { - return ControlLoopSchemaType.VNF; - } - } - - /** - * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert - * - * @param tcavesResponse {@link TCAVESResponse} TCA alert - * - * @return Source name - */ - public static String determineSourceName(final TCAVESResponse tcavesResponse) { - final AAI aai = tcavesResponse.getAai(); - if (aai.getGenericServerName() != null) { - return aai.getGenericServerName(); - } else { - return aai.getGenericVNFName(); - } - } - - - /** - * Extract Domain and Event Name from processor context if present - * - * @param processorContext processor context - * @return Tuple of domain and event Name - */ - public static Pair getDomainAndEventName( - @Nullable final TCACEFProcessorContext processorContext) { - - String domain = null; - String eventName = null; - - if (processorContext != null && - processorContext.getCEFEventListener() != null && - processorContext.getCEFEventListener().getEvent() != null && - processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) { - final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent() - .getCommonEventHeader(); - - if (commonEventHeader.getDomain() != null) { - domain = commonEventHeader.getDomain().name(); - } - - if (commonEventHeader.getEventName() != null) { - eventName = commonEventHeader.getEventName(); - } - - } - - return new ImmutablePair<>(domain, eventName); - - } - - /** - * Creates {@link TCAPolicy} Metrics per Event Name list - * - * @param eventNamesMap Map containing event Name as key and corresponding values - * - * @return List of {@link MetricsPerEventName} - */ - public static List createTCAPolicyMetricsPerEventNameList( - final Map> eventNamesMap) { - - // create a new metrics per event Name list - final List metricsPerEventNames = new LinkedList<>(); - - for (Map.Entry> eventNamesEntry : eventNamesMap.entrySet()) { - - // create new metrics per event Name instance - final MetricsPerEventName newMetricsPerEventName = - createNewMetricsPerEventName(eventNamesEntry); - metricsPerEventNames.add(newMetricsPerEventName); - - // determine all threshold related values - final Map thresholdsValuesMaps = - filterMapByKeyNamePrefix(eventNamesEntry.getValue(), - AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX); - - // create a map of all threshold values - final Map> thresholdsMap = - extractSubTree(thresholdsValuesMaps, 1, 2, - AnalyticsConstants.TCA_POLICY_DELIMITER); - - // add thresholds to nmetrics per event Names threshold list - for (Map thresholdMap : thresholdsMap.values()) { - newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap)); - } - - } - - return metricsPerEventNames; - } - - /** - * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap - * - * @param thresholdMap threshold map with threshold values - * - * @return new instance of TCA Policy Threshold - */ - public static Threshold createNewThreshold(final Map thresholdMap) { - final Threshold threshold = new Threshold(); - threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName")); - threshold.setVersion(thresholdMap.get("policy.version")); - threshold.setFieldPath(thresholdMap.get("policy.fieldPath")); - threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction"))); - threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity"))); - threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue"))); - threshold.setClosedLoopEventStatus( - ClosedLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus"))); - return threshold; - } - - /** - * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope - * extracted from given eventNamesEntry - * - * @param eventNamesEntry Event Names Entry - * - * @return new instance of MetricsPerEventName - */ - public static MetricsPerEventName createNewMetricsPerEventName( - final Map.Entry> eventNamesEntry) { - // determine event Name - final String eventName = eventNamesEntry.getKey(); - // determine event Name thresholds - final Map metricsPerEventNameThresholdsMap = eventNamesEntry.getValue(); - final MetricsPerEventName metricsPerEventName = new MetricsPerEventName(); - final List thresholds = new LinkedList<>(); - metricsPerEventName.setThresholds(thresholds); - metricsPerEventName.setEventName(eventName); - // bind policyName, policyVersion, policyScope and closedLoopControlName - metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName")); - metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion")); - metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope")); - metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf( - metricsPerEventNameThresholdsMap.get("controlLoopSchemaType"))); - return metricsPerEventName; - } - - /** - * Converts a flattened key/value map which has keys delimited by a given delimiter. - * The start Index and end index extract the sub-key value and returns a new map containing - * sub-keys and values. - * - * @param actualMap actual Map - * @param startIndex start index - * @param endIndex end index - * @param delimiter delimiter - * - * @return Map with new sub tree map - */ - public static Map> extractSubTree( - final Map actualMap, int startIndex, int endIndex, String delimiter) { - - final SortedMap> subTreeMap = new TreeMap<>(); - - // iterate over actual map entries - for (Map.Entry actualMapEntry : actualMap.entrySet()) { - final String actualMapKey = actualMapEntry.getKey(); - final String actualMapValue = actualMapEntry.getValue(); - - // determine delimiter start and end index - final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex); - final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex); - final int keyLength = actualMapKey.length(); - - // extract sub-tree map - if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) { - final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex); - final Map existingThresholdMap = subTreeMap.get(thresholdKey); - final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength); - if (existingThresholdMap == null) { - Map newThresholdMap = new LinkedHashMap<>(); - newThresholdMap.put(subMapKey, actualMapValue); - subTreeMap.put(thresholdKey, newThresholdMap); - } else { - existingThresholdMap.put(subMapKey, actualMapValue); - } - - } - } - - return subTreeMap; - - } - - - /** - * Provides a view of underlying map that filters out entries with keys starting with give prefix - * - * @param actualMap Target map that needs to be filtered - * @param keyNamePrefix key prefix - * - * @return a view of actual map which only show entries which have give prefix - */ - public static Map filterMapByKeyNamePrefix(final Map actualMap, - final String keyNamePrefix) { - return Maps.filterKeys(actualMap, - new Predicate() { - @Override - public boolean apply(@Nullable String key) { - return key != null && key.startsWith(keyNamePrefix); - } - }); - } - - - /** - * Creates Quartz Scheduler - * - * @param pollingIntervalMS polling interval - * @param stdSchedulerFactory Quartz standard schedule factory instance - * @param quartzPublisherPropertiesFileName quartz properties file name - * @param jobDataMap job Data map - * @param quartzJobClass Quartz Job Class - * @param quartzJobName Quartz Job Name - * @param quartzTriggerName Quartz Trigger name - * - * @param An implementation of Quartz {@link Job} interface - * @return Configured Quartz Scheduler - * - * @throws SchedulerException exception if unable to create to Quartz Scheduler - */ - public static Scheduler createQuartzScheduler(final Integer pollingIntervalMS, - final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName, - final JobDataMap jobDataMap, final Class quartzJobClass, final String quartzJobName, - final String quartzTriggerName) throws SchedulerException { - - // Initialize a new Quartz Standard scheduler - LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}", - quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName); - final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile( - quartzPublisherPropertiesFileName, new Properties()); - stdSchedulerFactory.initialize(quartzProperties); - final Scheduler scheduler = stdSchedulerFactory.getScheduler(); - - // Create a new job detail - final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName, - AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build(); - - // Create a new scheduling builder - final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule() - .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule - .repeatForever(); // repeats while worker is running - - // Create a trigger for the TCA Publisher Job - final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger() - .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME) - .startNow() // job starts right away - .withSchedule(simpleScheduleBuilder).build(); - - scheduler.scheduleJob(jobDetail, simpleTrigger); - LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName()); - return scheduler; - } - - - /** - * Does A&AI Enrichment for VM - * - * @param tcavesResponse Outgoing alert object - * @param aaiEnrichmentClient A&AI Enrichment client - * @param aaiVMEnrichmentAPIPath A&AI VM Enrichment API Path - * @param alertString alert String - * @param vmSourceName vm source name - */ - public static void doAAIVMEnrichment(final TCAVESResponse tcavesResponse, - final AAIEnrichmentClient aaiEnrichmentClient, - final String aaiVMEnrichmentAPIPath, - final String alertString, - final String vmSourceName) { - - final String filterString = "vserver-name:EQUALS:" + vmSourceName; - final ImmutableMap queryParams = ImmutableMap.of( - "search-node-type", "vserver", "filter", filterString); - - // fetch vm object resource Link from A&AI - final String vmAAIResourceLinkDetails = aaiEnrichmentClient.getEnrichmentDetails( - aaiVMEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders()); - final String vmObjectResourceLink = getVMObjectResourceLink(vmAAIResourceLinkDetails); - - if (vmObjectResourceLink == null) { - LOG.warn("No A&AI Enrichment possible for alert message: {}.VM Object resource Link cannot be " + - "determined for vmSourceName: {}.", alertString, vmSourceName); - } else { - - LOG.debug("Fetching VM A&AI Enrichment Details for VM Source Name: {}, Object resource Link: {}", - vmSourceName, vmObjectResourceLink); - - // fetch vm A&AI Enrichment - final String vmEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails( - vmObjectResourceLink, Collections.emptyMap(), createAAIEnrichmentHeaders()); - - // enrich AAI - enrichAAI(tcavesResponse.getAai(), vmEnrichmentDetails, alertString, - AnalyticsConstants.AAI_VSERVER_KEY_PREFIX); - } - - - } - - - /** - * Does A&AI Enrichment for VNF - * - * @param tcavesResponse Outgoing alert object - * @param aaiEnrichmentClient A&AI Enrichment client - * @param aaiVNFEnrichmentAPIPath A&AI VNF Enrichment API Path - * @param alertString alert String - * @param vnfSourceName vnf source name - */ - public static void doAAIVNFEnrichment(final TCAVESResponse tcavesResponse, - final AAIEnrichmentClient aaiEnrichmentClient, - final String aaiVNFEnrichmentAPIPath, - final String alertString, - final String vnfSourceName) { - final ImmutableMap queryParams = ImmutableMap.of("vnf-name", vnfSourceName); - - // fetch vnf A&AI Enrichment - final String vnfEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails( - aaiVNFEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders()); - - // enrich alert AAI - enrichAAI(tcavesResponse.getAai(), vnfEnrichmentDetails, alertString, AnalyticsConstants.AAI_VNF_KEY_PREFIX); - } - - /** - * Fetches VM Object Resource Link from A&AI Resource Link Json - * - * @param vmAAIResourceLinkDetails VM Object Resource Link from A&AI Resource Link Json - * - * @return object resource link String - */ - private static String getVMObjectResourceLink(final String vmAAIResourceLinkDetails) { - if (StringUtils.isNotBlank(vmAAIResourceLinkDetails)) { - try { - final JsonNode jsonNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(vmAAIResourceLinkDetails); - final JsonNode resourceLinkJsonNode = jsonNode.findPath("resource-link"); - if (!resourceLinkJsonNode.isMissingNode()) { - return resourceLinkJsonNode.asText(); - } - } catch (IOException e) { - LOG.warn("Unable to determine VM Object link inside AAI Resource Link Response JSON: {}. Exception: {}", - vmAAIResourceLinkDetails, e); - } - } - return null; - } - - /** - * Creates Http Headers for A&AI Enrichment client - * - * @return Http Headers Map for A&AI Enrichment client - */ - private static Map createAAIEnrichmentHeaders() { - final Map aaiEnrichmentHeaders = new LinkedHashMap<>(); - final String transactionId = Long.toString(new Date().getTime()); - aaiEnrichmentHeaders.put("X-FromAppId", "dcae-analytics-tca"); - aaiEnrichmentHeaders.put("X-TransactionId", transactionId); - aaiEnrichmentHeaders.put("Accept", "application/json"); - aaiEnrichmentHeaders.put("Real-Time", "true"); - aaiEnrichmentHeaders.put("Content-Type", "application/json"); - return aaiEnrichmentHeaders; - } - - - /** - * Populates A&AI details retrieved from A&AI Enrichment API into Alerts A&AI Object - * - * @param preEnrichmentAAI A&AI Alert object which needs to be populated with A&AI Enrichment Details - * @param aaiEnrichmentDetails A&AI Enrichment API fetched JSON String - * @param alertString Alert String - * @param keyPrefix Key prefix that needs to be added to each fetched A&AI Enrichment record - */ - private static void enrichAAI(final AAI preEnrichmentAAI, final String aaiEnrichmentDetails, - final String alertString, final String keyPrefix) { - - if (aaiEnrichmentDetails == null) { - LOG.warn("No A&AI Enrichment possible for AAI: {}. A&AI Enrichment details are absent." + - "Skipping Enrichment for alert message:{}", preEnrichmentAAI, alertString); - - } else { - - final AAI enrichmentDetailsAAI = getEnrichmentDetailsAAI(aaiEnrichmentDetails); - - if (enrichmentDetailsAAI != null) { - final Set> enrichedAAIEntrySet = - enrichmentDetailsAAI.getDynamicProperties().entrySet(); - final Map preEnrichmentAAIDynamicProperties = preEnrichmentAAI.getDynamicProperties(); - - // populate A&AI Enrichment details and add prefix to key - for (Map.Entry enrichedAAIEntry : enrichedAAIEntrySet) { - preEnrichmentAAIDynamicProperties.put(keyPrefix + enrichedAAIEntry.getKey(), - enrichedAAIEntry.getValue()); - } - - LOG.debug("A&AI Enrichment was completed successfully for alert message: {}. Enriched AAI: {}", - alertString, preEnrichmentAAI); - } else { - LOG.warn("No A&AI Enrichment possible for AAI: {}. Invalid A&AI Response: {}." + - "Skipping Enrichment for alert message: {}", - preEnrichmentAAI, aaiEnrichmentDetails, alertString); - } - } - - } - - /** - * Creates a new A&AI object with only top level A&AI Enrichment details - * - * @param aaiEnrichmentDetails A&AI Enrichment details - * - * @return new A&AI with only top level A&AI Enrichment details - */ - private static AAI getEnrichmentDetailsAAI(final String aaiEnrichmentDetails) { - try { - final JsonNode rootNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(aaiEnrichmentDetails); - final Iterator> fieldsIterator = rootNode.fields(); - while (fieldsIterator.hasNext()) { - final Map.Entry fieldEntry = fieldsIterator.next(); - final JsonNode jsonNode = fieldEntry.getValue(); - // remove all arrays, objects from A&AI Enrichment Json - if (jsonNode.isPojo() || jsonNode.isObject() || jsonNode.isArray()) { - fieldsIterator.remove(); - } - } - return ANALYTICS_MODEL_OBJECT_MAPPER.treeToValue(rootNode, AAI.class); - } catch (IOException e) { - LOG.error("Failed to Parse AAI Enrichment Details from JSON: {}, Exception: {}.", aaiEnrichmentDetails, e); - } - return null; - } - -} diff --git a/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/BaseAnalyticsTCAUnitTest.java b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/BaseAnalyticsTCAUnitTest.java new file mode 100644 index 0000000..9b3762c --- /dev/null +++ b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/BaseAnalyticsTCAUnitTest.java @@ -0,0 +1,162 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Suppliers; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Direction; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.onap.dcae.apod.analytics.model.util.AnalyticsModelIOUtils; +import org.onap.dcae.apod.analytics.model.util.json.AnalyticsModelObjectMapperSupplier; +import org.onap.dcae.apod.analytics.test.BaseDCAEAnalyticsUnitTest; + +import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * @author Rajiv Singla . Creation Date: 10/25/2016. + */ +public abstract class BaseAnalyticsTCAUnitTest extends BaseDCAEAnalyticsUnitTest { + + /** + * Object mapper to be used for all TCA Json Parsing + */ + protected static final ObjectMapper ANALYTICS_MODEL_OBJECT_MAPPER = + Suppliers.memoize(new AnalyticsModelObjectMapperSupplier()).get(); + + protected static final String TCA_POLICY_JSON_FILE_LOCATION = "data/json/policy/tca_policy.json"; + protected static final String CEF_MESSAGES_JSON_FILE_LOCATION = "data/json/cef/cef_messages.json"; + protected static final String CEF_MESSAGE_JSON_FILE_LOCATION = "data/json/cef/cef_message.json"; + protected static final String CEF_MESSAGE_WITH_THRESHOLD_VIOLATION_JSON_FILE_LOCATION = + "data/json/cef/cef_message_with_threshold_violation.json"; + + protected static final String TCA_CONTROLLER_POLICY_FILE_LOCATION = + "data/properties/tca_controller_policy.properties"; + + protected static final String TCA_AAI_VNF_ENRICHMENT_FILE_LOCATION = "data/json/aai/aai_vnf_enrichment.json"; + + protected static final String TCA_TEST_APP_CONFIG_NAME = "testTCAAppName"; + protected static final String TCA_TEST_APP_CONFIG_DESCRIPTION = "testTCAAppDescription"; + protected static final String TCA_TEST_APP_CONFIG_SUBSCRIBER_OUTPUT_STREAM_NAME = + "testTcaSubscriberOutputStreamName"; + protected static final String TCA_TEST_APP_CONFIG_VES_ALERT_TABLE_NAME = "testTcaVESAlertsTableName"; + protected static final String TCA_TEST_APP_CONFIG_VES_MESSAGE_STATUS_TABLE_NAME = + "testTcaVESMessageStatusTableName"; + + + /** + * Provides TCA Policy that can be used for testing + * + * @return test TCA Policy Object + */ + protected TCAPolicy getSampleTCAPolicy() { + try { + return ANALYTICS_MODEL_OBJECT_MAPPER.readValue(fromStream(TCA_POLICY_JSON_FILE_LOCATION), TCAPolicy.class); + } catch (IOException e) { + LOG.error("Error while parsing policy: {}", e); + throw new RuntimeException("Error while parsing policy", e); + } + } + + /** + * Provides list containing 350 CEF messages + * + * @return CEF Test Message + * @throws Exception Exception + */ + protected List getCEFMessages() throws Exception { + final String cefMessageAsString = fromStream(CEF_MESSAGES_JSON_FILE_LOCATION); + final TypeReference> eventListenerListTypeReference = + new TypeReference>() { + }; + return ANALYTICS_MODEL_OBJECT_MAPPER.readValue(cefMessageAsString, eventListenerListTypeReference); + } + + /** + * Provides 1 valid CEF messages which does not violate Threshold as String + * + * @return CEF Test Message String + * @throws Exception Exception + */ + protected String getValidCEFMessage() throws Exception { + return fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); + } + + + /** + * Provides single CEF Test Message + * + * @return CEF Test Message + * @throws Exception Exception + */ + protected EventListener getCEFEventListener() throws Exception { + final String cefMessageAsString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); + return ANALYTICS_MODEL_OBJECT_MAPPER.readValue(cefMessageAsString, EventListener.class); + } + + protected static List getThresholds() { + Threshold majorThreshold = new Threshold(); + majorThreshold.setClosedLoopControlName("CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A"); + majorThreshold.setFieldPath("$.event.measurementsForVfScalingFields.vNicUsageArray[*].packetsIn"); + majorThreshold.setVersion("Test Version"); + majorThreshold.setThresholdValue(500L); + majorThreshold.setClosedLoopEventStatus(ClosedLoopEventStatus.ONSET); + majorThreshold.setDirection(Direction.LESS_OR_EQUAL); + + Threshold criticalThreshold = new Threshold(); + criticalThreshold.setClosedLoopControlName("CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A"); + criticalThreshold.setThresholdValue(5000L); + criticalThreshold.setFieldPath("$.event.measurementsForVfScalingFields.vNicUsageArray[*].packetsIn"); + criticalThreshold.setClosedLoopEventStatus(ClosedLoopEventStatus.ONSET); + criticalThreshold.setDirection(Direction.GREATER_OR_EQUAL); + return Arrays.asList(majorThreshold, criticalThreshold); + } + + protected static Threshold getCriticalThreshold() { + Threshold criticalThreshold = new Threshold(); + criticalThreshold.setClosedLoopControlName("CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A"); + criticalThreshold.setThresholdValue(5000L); + criticalThreshold.setFieldPath("$.event.measurementsForVfScalingFields.vNicUsageArray[*].packetsIn"); + criticalThreshold.setDirection(Direction.GREATER_OR_EQUAL); + return criticalThreshold; + } + + protected static Map getControllerRuntimeArguments() { + final Properties controllerProperties = + AnalyticsModelIOUtils.loadPropertiesFile(TCA_CONTROLLER_POLICY_FILE_LOCATION, new Properties()); + + final Map runtimeArgs = new LinkedHashMap<>(); + for (Map.Entry property : controllerProperties.entrySet()) { + runtimeArgs.put(property.getKey().toString(), property.getValue().toString()); + } + + return runtimeArgs; + } + +} diff --git a/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessorTest.java b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessorTest.java new file mode 100644 index 0000000..1a0a2c4 --- /dev/null +++ b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessorTest.java @@ -0,0 +1,56 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import org.junit.Test; +import org.onap.dcae.apod.analytics.common.exception.MessageProcessingException; +import org.onap.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; + +/** + * @author Rajiv Singla . Creation Date: 12/16/2016. + */ +public class AbstractTCAECEFPolicyProcessorTest extends BaseAnalyticsTCAUnitTest { + + private class DummyAbstractTCAECEFPolicyProcessor extends AbstractTCAECEFPolicyProcessor { + + @Override + public String getProcessorDescription() { + return "dummy"; + } + + @Override + public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { + return processorContext; + } + } + + + @Test(expected = MessageProcessingException.class) + public void preProcessorWhenThereIsNoCEFMessage() throws Exception { + DummyAbstractTCAECEFPolicyProcessor dummyAbstractTCAECEFPolicyProcessor = new + DummyAbstractTCAECEFPolicyProcessor(); + + final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(null, getSampleTCAPolicy()); + processorContext.setCEFEventListener(null); + dummyAbstractTCAECEFPolicyProcessor.preProcessor(processorContext); + } + +} diff --git a/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFJsonProcessorTest.java b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFJsonProcessorTest.java new file mode 100644 index 0000000..b3ae9eb --- /dev/null +++ b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFJsonProcessorTest.java @@ -0,0 +1,117 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import org.junit.Test; +import org.onap.dcae.apod.analytics.common.exception.MessageProcessingException; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +/** + * + * @author Rajiv Singla . Creation Date: 11/9/2016. + */ +public class TCACEFJsonProcessorTest extends BaseAnalyticsTCAUnitTest { + + + // A valid CEF Message + @Test + public void testCEFJsonProcessorWithValidCEFMessage() throws Exception { + + final String cefMessageString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); + final TCACEFProcessorContext tcacefProcessorContext = + new TCACEFProcessorContext(cefMessageString, getSampleTCAPolicy()); + + TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); + final TCACEFProcessorContext finalProcessorContext = tcacefJsonProcessor.apply(tcacefProcessorContext); + + final EventListener cefEventListener = finalProcessorContext.getCEFEventListener(); + + assertNotNull("CEF Event Listener must be present", cefEventListener); + + } + + // Even if message is not a valid CEF format but still a Json - Json Processor will parse it + @Test + public void testCEFJsonProcessorWithValidJson() throws Exception { + + final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext( + " { \"key\" : \"value\" } ", getSampleTCAPolicy()); + + TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); + final TCACEFProcessorContext finalProcessorContext = tcacefJsonProcessor.apply(tcacefProcessorContext); + final EventListener cefEventListener = finalProcessorContext.getCEFEventListener(); + + assertNotNull("Even if message is not a valid CEF format but a valid Json.Json Processor must be able to " + + "parse it", + cefEventListener); + } + + @Test(expected = MessageProcessingException.class) + public void testCEFJsonProcessorWithCEFMessageAsNull() throws Exception { + + final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(null, getSampleTCAPolicy()); + + TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); + tcacefJsonProcessor.apply(tcacefProcessorContext); + + } + + @Test + public void testCEFJsonProcessorWithCEFMessageIsBlank() throws Exception { + + final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(" ", getSampleTCAPolicy()); + + TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); + final TCACEFProcessorContext finalProcessorContext = tcacefJsonProcessor.apply(tcacefProcessorContext); + assertFalse("Blank message must terminate processing of message chain", finalProcessorContext + .canProcessingContinue()); + } + + + @Test + public void testCEFJsonProcessorWithCEFMessageWhichIsNotValidMessage() throws Exception { + + final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(" Invalid Message ", + getSampleTCAPolicy()); + + TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); + final TCACEFProcessorContext finalProcessorContext = tcacefJsonProcessor.apply(tcacefProcessorContext); + assertFalse("Invalid message must terminate processing of message chain", finalProcessorContext + .canProcessingContinue()); + } + + + @Test(expected = MessageProcessingException.class) + public void testCEFJsonProcessorWithCEFMessageWhichIsNotValidJson() throws Exception { + + final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext( + " { \"Invalid Event Listener Json\" } ", getSampleTCAPolicy()); + + TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); + tcacefJsonProcessor.apply(tcacefProcessorContext); + } + + +} diff --git a/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilterTest.java b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilterTest.java new file mode 100644 index 0000000..6609128 --- /dev/null +++ b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilterTest.java @@ -0,0 +1,74 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.apod.analytics.common.service.processor.ProcessingState; +import org.onap.dcae.apod.analytics.model.domain.cef.Domain; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author Rajiv Singla . Creation Date: 12/16/2016. + */ +public class TCACEFPolicyDomainFilterTest extends BaseAnalyticsTCAUnitTest { + + private TCACEFPolicyDomainFilter tcacefPolicyDomainFilter; + private TCACEFProcessorContext processorContext; + private EventListener cefEventListener; + + @Before + public void before() throws Exception { + tcacefPolicyDomainFilter = new TCACEFPolicyDomainFilter(); + processorContext = new TCACEFProcessorContext("", getSampleTCAPolicy()); + cefEventListener = getCEFEventListener(); + processorContext.setCEFEventListener(cefEventListener); + } + + @Test + public void testProcessMessageWhenMessageIsValid() throws Exception { + tcacefPolicyDomainFilter.processMessage(processorContext); + assertThat("Processing must finish successfully", + tcacefPolicyDomainFilter.getProcessingState(), is(ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY)); + } + + @Test + public void testProcessMessageWhenCEFEventIsNull() throws Exception { + cefEventListener.setEvent(null); + processorContext.setCEFEventListener(cefEventListener); + tcacefPolicyDomainFilter.processMessage(processorContext); + assertThat("Processing must terminate early", + tcacefPolicyDomainFilter.getProcessingState(), is(ProcessingState.PROCESSING_TERMINATED_EARLY)); + } + + @Test + public void testProcessMessageWhenPolicyDomainDoesNotMatchMessageDomain() throws Exception { + cefEventListener.getEvent().getCommonEventHeader().setDomain(Domain.other); + tcacefPolicyDomainFilter.processMessage(processorContext); + assertThat("Processing must terminate early", + tcacefPolicyDomainFilter.getProcessingState(), is(ProcessingState.PROCESSING_TERMINATED_EARLY)); + } + +} diff --git a/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilterTest.java b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilterTest.java new file mode 100644 index 0000000..6f01ab3 --- /dev/null +++ b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilterTest.java @@ -0,0 +1,75 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.apod.analytics.common.service.processor.ProcessingState; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * @author Rajiv Singla . Creation Date: 12/19/2016. + */ +public class TCACEFPolicyEventNameFilterTest extends BaseAnalyticsTCAUnitTest { + + private TCACEFPolicyEventNameFilter tcacefPolicyEventNameFilter; + private TCACEFProcessorContext processorContext; + private EventListener cefEventListener; + + @Before + public void before() throws Exception { + tcacefPolicyEventNameFilter = new TCACEFPolicyEventNameFilter(); + processorContext = new TCACEFProcessorContext("", getSampleTCAPolicy()); + cefEventListener = getCEFEventListener(); + processorContext.setCEFEventListener(cefEventListener); + } + + @Test + public void testProcessMessageWhenMessageIsValid() throws Exception { + tcacefPolicyEventNameFilter.processMessage(processorContext); + assertThat("Processing must finish successfully", + tcacefPolicyEventNameFilter.getProcessingState(), + is(ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY)); + } + + @Test + public void testProcessMessageWhenCEFEventIsNull() throws Exception { + cefEventListener.setEvent(null); + processorContext.setCEFEventListener(cefEventListener); + tcacefPolicyEventNameFilter.processMessage(processorContext); + assertThat("Processing must terminate early", + tcacefPolicyEventNameFilter.getProcessingState(), is(ProcessingState.PROCESSING_TERMINATED_EARLY)); + } + + @Test + public void testProcessMessageWhenPolicyEventNameDoesNotMatchMessageEventName() throws Exception { + cefEventListener.getEvent().getCommonEventHeader().setEventName("someNonPolicyEventName"); + tcacefPolicyEventNameFilter.processMessage(processorContext); + assertThat("Processing must terminate early", + tcacefPolicyEventNameFilter.getProcessingState(), is(ProcessingState.PROCESSING_TERMINATED_EARLY)); + } + + +} diff --git a/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessorTest.java b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessorTest.java new file mode 100644 index 0000000..0cec690 --- /dev/null +++ b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessorTest.java @@ -0,0 +1,81 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import org.junit.Test; +import org.onap.dcae.apod.analytics.common.service.processor.ProcessingState; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * + * @author Rajiv Singla . Creation Date: 11/9/2016. + */ +public class TCACEFPolicyThresholdsProcessorTest extends BaseAnalyticsTCAUnitTest { + + @Test + public void testCEFPolicyThresholdProcessorWithNoThresholdViolation() throws Exception { + + final String cefMessageString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); + final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(cefMessageString, + getSampleTCAPolicy()); + tcacefProcessorContext.setCEFEventListener(getCEFEventListener()); + + AbstractTCAECEFPolicyProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor(); + final TCACEFProcessorContext finalProcessorContext = policyThresholdsProcessor.apply(tcacefProcessorContext); + + assertFalse("Process Context can Processing Continue flag should be false", finalProcessorContext + .canProcessingContinue()); + assertThat("Policy Threshold Processor State must be terminated early", + policyThresholdsProcessor.getProcessingState(), is(ProcessingState.PROCESSING_TERMINATED_EARLY)); + assertEquals("Policy must not change", getSampleTCAPolicy(), finalProcessorContext.getTCAPolicy()); + + } + + @Test + public void testCEFPolicyThresholdProcessorWithThresholdViolation() throws Exception { + + final String cefMessageString = fromStream(CEF_MESSAGE_WITH_THRESHOLD_VIOLATION_JSON_FILE_LOCATION); + final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(cefMessageString, + getSampleTCAPolicy()); + + final EventListener eventListener = ANALYTICS_MODEL_OBJECT_MAPPER.readValue(cefMessageString, + EventListener.class); + tcacefProcessorContext.setCEFEventListener(eventListener); + + AbstractTCAECEFPolicyProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor(); + final TCACEFProcessorContext finalProcessorContext = policyThresholdsProcessor.apply(tcacefProcessorContext); + + assertTrue("Process Context can Processing Continue flag should be true", finalProcessorContext + .canProcessingContinue()); + assertThat("Policy Threshold Processor State must be successful", + policyThresholdsProcessor.getProcessingState(), is(ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY)); + assertEquals("Policy must not change", getSampleTCAPolicy(), finalProcessorContext.getTCAPolicy()); + + } + +} diff --git a/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFProcessorContextTest.java b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFProcessorContextTest.java new file mode 100644 index 0000000..1482565 --- /dev/null +++ b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/processor/TCACEFProcessorContextTest.java @@ -0,0 +1,38 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.processor; + +import org.junit.Test; +import org.onap.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; + +/** + * + * @author Rajiv Singla . Creation Date: 11/14/2016. + */ +public class TCACEFProcessorContextTest extends BaseAnalyticsTCAUnitTest { + + @Test + public void testProcessorContextSerialization() throws Exception { + TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(getValidCEFMessage(), + getSampleTCAPolicy()); + testSerialization(tcacefProcessorContext, TCACEFProcessorContextTest.class); + } +} diff --git a/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/utils/TCAUtilsTest.java b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/utils/TCAUtilsTest.java new file mode 100644 index 0000000..e6b64d4 --- /dev/null +++ b/dcae-analytics-tca/src/test/java/org/onap/dcae/apod/analytics/tca/utils/TCAUtilsTest.java @@ -0,0 +1,418 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.tca.utils; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Table; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.common.AnalyticsConstants; +import org.onap.dcae.apod.analytics.common.exception.MessageProcessingException; +import org.onap.dcae.apod.analytics.model.domain.cef.CommonEventHeader; +import org.onap.dcae.apod.analytics.model.domain.cef.Domain; +import org.onap.dcae.apod.analytics.model.domain.cef.Event; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.model.domain.cef.EventSeverity; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Direction; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse; +import org.onap.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobDetail; +import org.quartz.Scheduler; +import org.quartz.SimpleTrigger; +import org.quartz.impl.StdSchedulerFactory; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.isA; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author Rajiv Singla . Creation Date: 11/9/2016. + */ +public class TCAUtilsTest extends BaseAnalyticsTCAUnitTest { + + @Test + public void testGetPolicyEventNames() throws Exception { + + final TCAPolicy sampleTCAPolicy = getSampleTCAPolicy(); + final List eventNames = TCAUtils.getPolicyEventNames(sampleTCAPolicy); + + assertThat("Policy event names must contain vFirewall, vLoadBalancer, virtualVMEventName", eventNames, + containsInAnyOrder("Mfvs_eNodeB_RANKPI", "vLoadBalancer", "virtualVMEventName")); + } + + @Test + public void testGetPolicyEventNamesSupplier() throws Exception { + final TCAPolicy sampleTCAPolicy = getSampleTCAPolicy(); + final Supplier> policyEventNamesSupplier = TCAUtils.getPolicyEventNamesSupplier + (sampleTCAPolicy); + final List eventNames = policyEventNamesSupplier.get(); + assertThat("Policy event names must contain vFirewall and vLoadBalancer", eventNames, + containsInAnyOrder("Mfvs_eNodeB_RANKPI", "vLoadBalancer", "virtualVMEventName")); + } + + @Test + public void testProcessCEFMessage() throws Exception { + final String cefMessageString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); + final TCACEFProcessorContext tcacefProcessorContext = TCAUtils.filterCEFMessage(cefMessageString, + getSampleTCAPolicy()); + assertThat("TCAECEFProcessor Processor Context can continue flag is true", tcacefProcessorContext + .canProcessingContinue(), is(true)); + } + + @Test + public void testGetPolicyFRThresholdsTableSupplier() throws Exception { + final Table> policyFRThresholdPathTable = TCAUtils + .getPolicyEventNameThresholdsTableSupplier(getSampleTCAPolicy()).get(); + + final Map> eNodeBRankpi = policyFRThresholdPathTable.row("Mfvs_eNodeB_RANKPI"); + final Map> vLoadBalancer = policyFRThresholdPathTable.row("vLoadBalancer"); + + final Set eNodeBRankpiFieldPaths = eNodeBRankpi.keySet(); + final Set vLoadBalancerPaths = vLoadBalancer.keySet(); + + final String receivedBroadcastPacketsFieldPath = + "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated"; + assertThat("eNodeBRankpi threshold field path size must be " + + "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*]" + + ".receivedBroadcastPacketsAccumulated", + eNodeBRankpiFieldPaths.iterator().next(), + is(receivedBroadcastPacketsFieldPath)); + + assertThat("vLoadBalancer threshold field path size must be " + + "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*]" + + ".receivedBroadcastPacketsAccumulated", + vLoadBalancerPaths.iterator().next(), + is(receivedBroadcastPacketsFieldPath)); + + final List eNodeBRankpiThresholds = policyFRThresholdPathTable.get("Mfvs_eNodeB_RANKPI", + receivedBroadcastPacketsFieldPath); + final List vLoadBalancerThresholds = policyFRThresholdPathTable.get("vLoadBalancer", + receivedBroadcastPacketsFieldPath); + + assertThat("eNodeBRankpi Threshold size must be 3", eNodeBRankpiThresholds.size(), is(3)); + assertThat("vLoadBalancer Threshold size must be 2", vLoadBalancerThresholds.size(), is(2)); + } + + @Test + public void testGetJsonPathValueWithValidMessageAndPolicy() throws Exception { + final String cefMessageString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); + final String jsonPath = + "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated"; + final ImmutableSet fieldPaths = ImmutableSet.of(jsonPath); + final Map> jsonPathValueMap = TCAUtils.getJsonPathValue(cefMessageString, fieldPaths); + assertThat("Json Path value must match", + jsonPathValueMap.get(jsonPath).get(0), is(new BigDecimal(5000))); + + } + + @Test + public void testGetJsonPathValueWithValidPath() throws Exception { + final String cefMessageString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); + final String jsonPath = "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].invalid"; + final ImmutableSet fieldPaths = ImmutableSet.of(jsonPath); + final Map> jsonPathValueMap = TCAUtils.getJsonPathValue(cefMessageString, fieldPaths); + assertThat("Json path value must be empty", jsonPathValueMap.size(), is(0)); + + } + + + @Test + public void testCreateNewTCAVESResponseWithVFControlLoopSchemaType() throws Exception { + TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); + + MetricsPerEventName metricsPerEventName = mock(MetricsPerEventName.class); + when(metricsPerEventName.getThresholds()).thenReturn(getThresholds()); + when(metricsPerEventName.getPolicyScope()).thenReturn("Test Policy scope"); + when(tcacefProcessorContext.getMetricsPerEventName()).thenReturn(metricsPerEventName); + when(metricsPerEventName.getEventName()).thenReturn("testEventName"); + when(metricsPerEventName.getControlLoopSchemaType()).thenReturn(ControlLoopSchemaType.VM); + + when(tcacefProcessorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); + TCAVESResponse tcaVESResponse = TCAUtils.createNewTCAVESResponse(tcacefProcessorContext, "TCA_APP_NAME"); + + //TODO : Add proper assertions, as the usage is not clearly understood + assertThat(tcaVESResponse.getClosedLoopControlName(), + is("CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A")); + assertThat(tcaVESResponse.getVersion(), is("Test Version")); + assertThat(tcaVESResponse.getPolicyScope(), is("Test Policy scope")); + assertNull(tcaVESResponse.getAai().getGenericVNFName()); + assertNotNull(tcaVESResponse.getAai().getGenericServerName()); + } + + @Test + public void testCreateNewTCAVESResponseWithFunctionalRolevFirewall() throws Exception { + TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); + + MetricsPerEventName metricsPerEventName = mock(MetricsPerEventName.class); + when(metricsPerEventName.getThresholds()).thenReturn(getThresholds()); + when(metricsPerEventName.getPolicyScope()).thenReturn("Test Policy scope"); + when(tcacefProcessorContext.getMetricsPerEventName()).thenReturn(metricsPerEventName); + when(metricsPerEventName.getEventName()).thenReturn("vFirewall"); + + when(tcacefProcessorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); + TCAVESResponse tcaVESResponse = TCAUtils.createNewTCAVESResponse(tcacefProcessorContext, "TCA_APP_NAME"); + + //TODO : Add proper assertions, as the usage is not clearly understood + assertThat(tcaVESResponse.getClosedLoopControlName(), + is("CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A")); + assertThat(tcaVESResponse.getVersion(), is("Test Version")); + assertThat(tcaVESResponse.getPolicyScope(), is("Test Policy scope")); + assertNotNull(tcaVESResponse.getAai().getGenericVNFName()); + assertNull(tcaVESResponse.getAai().getGenericServerName()); + + } + + @Rule + public ExpectedException expectedIllegalArgumentException = ExpectedException.none(); + + @Test + public void testCreateNewTCAVESResponseNullFunctionalRole() throws Exception { + expectedIllegalArgumentException.expect(MessageProcessingException.class); + expectedIllegalArgumentException.expectCause(isA(IllegalArgumentException.class)); + expectedIllegalArgumentException.expectMessage("No violations metrics. Unable to create VES Response"); + + TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); + TCAVESResponse tcaVESResponse = TCAUtils.createNewTCAVESResponse(tcacefProcessorContext, "TCA_APP_NAME"); + assertNotNull(tcaVESResponse.getClosedLoopControlName()); + } + + @Test + public void testPrioritizeThresholdViolations() throws Exception { + + Map thresholdMap = new HashMap<>(); + Threshold majorThreshold = mock(Threshold.class); + when(majorThreshold.getSeverity()).thenReturn(EventSeverity.MAJOR); + thresholdMap.put("MAJOR", majorThreshold); + + Threshold result1 = TCAUtils.prioritizeThresholdViolations(thresholdMap); + assertEquals(result1.getSeverity(), EventSeverity.MAJOR); + + Threshold criticalThreshold = mock(Threshold.class); + when(criticalThreshold.getSeverity()).thenReturn(EventSeverity.CRITICAL); + thresholdMap.put("CRITICAL", criticalThreshold); + + Threshold result2 = TCAUtils.prioritizeThresholdViolations(thresholdMap); + assertEquals(result2.getSeverity(), EventSeverity.CRITICAL); + } + + @Test + public void testCreateViolatedMetrics() throws Exception { + TCAPolicy tcaPolicy = getSampleTCAPolicy(); + Threshold violatedThreshold = getCriticalThreshold(); + String functionalRole = "Mfvs_eNodeB_RANKPI"; + MetricsPerEventName result = TCAUtils.createViolatedMetrics(tcaPolicy, violatedThreshold, functionalRole); + assertThat(result.getPolicyScope(), is("resource=vFirewall;type=configuration")); + assertThat(result.getPolicyName(), is("configuration.dcae.microservice.tca.xml")); + } + + @Test + public void testCreateViolatedMetricsWrongEventName() throws Exception { + expectedIllegalArgumentException.expect(MessageProcessingException.class); + expectedIllegalArgumentException.expectCause(isA(IllegalStateException.class)); + String eventName = "badEventName"; + expectedIllegalArgumentException.expectMessage("TCA Policy must contain eventName: " + eventName); + TCAPolicy tcaPolicy = getSampleTCAPolicy(); + Threshold violatedThreshold = getCriticalThreshold(); + TCAUtils.createViolatedMetrics(tcaPolicy, violatedThreshold, eventName); + } + + @Test + public void testGetDomainAndEventName() { + TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); + EventListener eventListener = mock(EventListener.class); + Event event = mock(Event.class); + CommonEventHeader commonEventHeader = mock(CommonEventHeader.class); + + Pair result = TCAUtils.getDomainAndEventName(tcacefProcessorContext); + assertNull(result.getLeft()); + assertNull(result.getRight()); + + when(tcacefProcessorContext.getCEFEventListener()).thenReturn(eventListener); + result = TCAUtils.getDomainAndEventName(tcacefProcessorContext); + assertNull(result.getLeft()); + assertNull(result.getRight()); + + when(eventListener.getEvent()).thenReturn(event); + result = TCAUtils.getDomainAndEventName(tcacefProcessorContext); + assertNull(result.getLeft()); + assertNull(result.getRight()); + + when(event.getCommonEventHeader()).thenReturn(commonEventHeader); + result = TCAUtils.getDomainAndEventName(tcacefProcessorContext); + assertNull(result.getLeft()); + assertNull(result.getRight()); + + when(commonEventHeader.getDomain()).thenReturn(Domain.other); + when(commonEventHeader.getEventName()).thenReturn("eventName"); + + result = TCAUtils.getDomainAndEventName(tcacefProcessorContext); + assertEquals(result.getLeft(), "other"); + assertEquals(result.getRight(), "eventName"); + + } + + @Test + public void testComputeThresholdViolationsNotPresent() throws Exception { + TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); + when(tcacefProcessorContext.canProcessingContinue()).thenReturn(true); + when(tcacefProcessorContext.getMessage()).thenReturn(getValidCEFMessage()); + + when(tcacefProcessorContext.getTCAPolicy()).thenReturn(getSampleTCAPolicy()); + when(tcacefProcessorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); + + TCACEFProcessorContext result = TCAUtils.computeThresholdViolations(tcacefProcessorContext); + assertNotNull(result); + verify(result, times(0)).setMetricsPerEventName(Mockito.any(MetricsPerEventName.class)); + assertEquals("Policy must not change", getSampleTCAPolicy(), result.getTCAPolicy()); + } + + @Test + public void testComputeThresholdViolationsPresent() throws Exception { + TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); + when(tcacefProcessorContext.canProcessingContinue()).thenReturn(true); + final String cefMessageString = fromStream(CEF_MESSAGE_WITH_THRESHOLD_VIOLATION_JSON_FILE_LOCATION); + when(tcacefProcessorContext.getMessage()).thenReturn(cefMessageString); + + when(tcacefProcessorContext.getTCAPolicy()).thenReturn(getSampleTCAPolicy()); + when(tcacefProcessorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); + + TCACEFProcessorContext result = TCAUtils.computeThresholdViolations(tcacefProcessorContext); + verify(result, times(1)).setMetricsPerEventName(Mockito.any(MetricsPerEventName.class)); + + assertEquals("Policy must not change", getSampleTCAPolicy(), result.getTCAPolicy()); + } + + + @Test + public void testCreateTCAPolicyMetricsPerKeyName() throws Exception { + + final Map tcaPolicyMap = TCAUtils.filterMapByKeyNamePrefix(getControllerRuntimeArguments(), + AnalyticsConstants.TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH); + + // determine functional Roles + final Map> functionalRolesMap = + TCAUtils.extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER); + + final List tcaPolicyMetricsPerEventNameList = + TCAUtils.createTCAPolicyMetricsPerEventNameList(functionalRolesMap); + + assertThat("There are two Metrics per function role", 2, + is(tcaPolicyMetricsPerEventNameList.size())); + } + + + @Test + public void testCreateQuartzScheduler() throws Exception { + final Scheduler scheduler = Mockito.mock(Scheduler.class); + final StdSchedulerFactory stdSchedulerFactory = Mockito.mock(StdSchedulerFactory.class); + when(stdSchedulerFactory.getScheduler()).thenReturn(scheduler); + final JobDataMap jobDataMap = Mockito.mock(JobDataMap.class); + TCAUtils.createQuartzScheduler(1000, stdSchedulerFactory, + "data/properties/quartz-test.properties", jobDataMap, Job.class, + "testJob", "testTigger"); + verify(scheduler, times(1)) + .scheduleJob(Mockito.any(JobDetail.class), Mockito.any(SimpleTrigger.class)); + } + + + @Test + public void testCreateTCAAlertStringWhenCEFIsEnabled() throws Exception { + final MetricsPerEventName violatedMetrics = createViolatedMetricsPerEventName(EventSeverity.CRITICAL); + TCACEFProcessorContext processorContext = mock(TCACEFProcessorContext.class); + when(processorContext.getMetricsPerEventName()).thenReturn(violatedMetrics); + when(processorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); + final String alertString = TCAUtils.createTCAAlertString(processorContext, "testApp", true); + assertTrue(alertString.contains("thresholdCrossingAlertFields")); + } + + @Test(expected = MessageProcessingException.class) + public void testCreateTCAAlertStringWhenViolatedMetricsNotPresentAndCEFIsEnabled() throws Exception { + TCACEFProcessorContext processorContext = mock(TCACEFProcessorContext.class); + when(processorContext.getMetricsPerEventName()).thenReturn(null); + TCAUtils.createTCAAlertString(processorContext, "testApp", true); + } + + @Test + public void testCreateTCAAlertStringWhenCEFIsDisabled() throws Exception { + final MetricsPerEventName violatedMetrics = createViolatedMetricsPerEventName(EventSeverity.MAJOR); + TCACEFProcessorContext processorContext = mock(TCACEFProcessorContext.class); + when(processorContext.getMetricsPerEventName()).thenReturn(violatedMetrics); + when(processorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); + final String alertString = TCAUtils.createTCAAlertString(processorContext, "testApp", false); + assertFalse(alertString.contains("thresholdCrossingAlertFields")); + } + + @Test(expected = MessageProcessingException.class) + public void testCreateTCAAlertStringWhenViolatedMetricsNotPresentAndCEFIsDisabled() throws Exception { + TCACEFProcessorContext processorContext = mock(TCACEFProcessorContext.class); + when(processorContext.getMetricsPerEventName()).thenReturn(null); + TCAUtils.createTCAAlertString(processorContext, "testApp", false); + } + + private static MetricsPerEventName createViolatedMetricsPerEventName(EventSeverity severity) { + final Threshold violatedThreshold = new Threshold(); + violatedThreshold.setSeverity(severity); + violatedThreshold.setDirection(Direction.GREATER); + violatedThreshold.setClosedLoopControlName("violatedThresholdClosedLoopName"); + violatedThreshold.setActualFieldValue(new BigDecimal(100L)); + violatedThreshold.setFieldPath("violatedThresholdFieldPath"); + violatedThreshold.setVersion("violatedThresholdVersion"); + violatedThreshold.setClosedLoopEventStatus(ClosedLoopEventStatus.ONSET); + violatedThreshold.setThresholdValue(50L); + + final MetricsPerEventName violatedMetrics = new MetricsPerEventName(); + violatedMetrics.setPolicyName("violatePolicyName"); + violatedMetrics.setPolicyVersion("violatedPolicyVersion"); + violatedMetrics.setPolicyScope("violatedPolicyScope"); + violatedMetrics.setEventName("violatedEventName"); + violatedMetrics.setThresholds(Arrays.asList(violatedThreshold)); + return violatedMetrics; + } +} diff --git a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/BaseAnalyticsTCAUnitTest.java b/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/BaseAnalyticsTCAUnitTest.java deleted file mode 100644 index ba3b6bb..0000000 --- a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/BaseAnalyticsTCAUnitTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Suppliers; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; -import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils; -import org.openecomp.dcae.apod.analytics.model.util.json.AnalyticsModelObjectMapperSupplier; -import org.openecomp.dcae.apod.analytics.test.BaseDCAEAnalyticsUnitTest; - -import java.io.IOException; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * @author Rajiv Singla . Creation Date: 10/25/2016. - */ -public abstract class BaseAnalyticsTCAUnitTest extends BaseDCAEAnalyticsUnitTest { - - /** - * Object mapper to be used for all TCA Json Parsing - */ - protected static final ObjectMapper ANALYTICS_MODEL_OBJECT_MAPPER = - Suppliers.memoize(new AnalyticsModelObjectMapperSupplier()).get(); - - protected static final String TCA_POLICY_JSON_FILE_LOCATION = "data/json/policy/tca_policy.json"; - protected static final String CEF_MESSAGES_JSON_FILE_LOCATION = "data/json/cef/cef_messages.json"; - protected static final String CEF_MESSAGE_JSON_FILE_LOCATION = "data/json/cef/cef_message.json"; - protected static final String CEF_MESSAGE_WITH_THRESHOLD_VIOLATION_JSON_FILE_LOCATION = - "data/json/cef/cef_message_with_threshold_violation.json"; - - protected static final String TCA_CONTROLLER_POLICY_FILE_LOCATION = - "data/properties/tca_controller_policy.properties"; - - protected static final String TCA_AAI_VNF_ENRICHMENT_FILE_LOCATION = "data/json/aai/aai_vnf_enrichment.json"; - - protected static final String TCA_TEST_APP_CONFIG_NAME = "testTCAAppName"; - protected static final String TCA_TEST_APP_CONFIG_DESCRIPTION = "testTCAAppDescription"; - protected static final String TCA_TEST_APP_CONFIG_SUBSCRIBER_OUTPUT_STREAM_NAME = - "testTcaSubscriberOutputStreamName"; - protected static final String TCA_TEST_APP_CONFIG_VES_ALERT_TABLE_NAME = "testTcaVESAlertsTableName"; - protected static final String TCA_TEST_APP_CONFIG_VES_MESSAGE_STATUS_TABLE_NAME = - "testTcaVESMessageStatusTableName"; - - - /** - * Provides TCA Policy that can be used for testing - * - * @return test TCA Policy Object - */ - protected TCAPolicy getSampleTCAPolicy() { - try { - return ANALYTICS_MODEL_OBJECT_MAPPER.readValue(fromStream(TCA_POLICY_JSON_FILE_LOCATION), TCAPolicy.class); - } catch (IOException e) { - LOG.error("Error while parsing policy: {}", e); - throw new RuntimeException("Error while parsing policy", e); - } - } - - /** - * Provides list containing 350 CEF messages - * - * @return CEF Test Message - * @throws Exception Exception - */ - protected List getCEFMessages() throws Exception { - final String cefMessageAsString = fromStream(CEF_MESSAGES_JSON_FILE_LOCATION); - final TypeReference> eventListenerListTypeReference = - new TypeReference>() { - }; - return ANALYTICS_MODEL_OBJECT_MAPPER.readValue(cefMessageAsString, eventListenerListTypeReference); - } - - /** - * Provides 1 valid CEF messages which does not violate Threshold as String - * - * @return CEF Test Message String - * @throws Exception Exception - */ - protected String getValidCEFMessage() throws Exception { - return fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); - } - - - /** - * Provides single CEF Test Message - * - * @return CEF Test Message - * @throws Exception Exception - */ - protected EventListener getCEFEventListener() throws Exception { - final String cefMessageAsString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); - return ANALYTICS_MODEL_OBJECT_MAPPER.readValue(cefMessageAsString, EventListener.class); - } - - protected static List getThresholds() { - Threshold majorThreshold = new Threshold(); - majorThreshold.setClosedLoopControlName("CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A"); - majorThreshold.setFieldPath("$.event.measurementsForVfScalingFields.vNicUsageArray[*].packetsIn"); - majorThreshold.setVersion("Test Version"); - majorThreshold.setThresholdValue(500L); - majorThreshold.setClosedLoopEventStatus(ClosedLoopEventStatus.ONSET); - majorThreshold.setDirection(Direction.LESS_OR_EQUAL); - - Threshold criticalThreshold = new Threshold(); - criticalThreshold.setClosedLoopControlName("CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A"); - criticalThreshold.setThresholdValue(5000L); - criticalThreshold.setFieldPath("$.event.measurementsForVfScalingFields.vNicUsageArray[*].packetsIn"); - criticalThreshold.setClosedLoopEventStatus(ClosedLoopEventStatus.ONSET); - criticalThreshold.setDirection(Direction.GREATER_OR_EQUAL); - return Arrays.asList(majorThreshold, criticalThreshold); - } - - protected static Threshold getCriticalThreshold() { - Threshold criticalThreshold = new Threshold(); - criticalThreshold.setClosedLoopControlName("CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A"); - criticalThreshold.setThresholdValue(5000L); - criticalThreshold.setFieldPath("$.event.measurementsForVfScalingFields.vNicUsageArray[*].packetsIn"); - criticalThreshold.setDirection(Direction.GREATER_OR_EQUAL); - return criticalThreshold; - } - - protected static Map getControllerRuntimeArguments() { - final Properties controllerProperties = - AnalyticsModelIOUtils.loadPropertiesFile(TCA_CONTROLLER_POLICY_FILE_LOCATION, new Properties()); - - final Map runtimeArgs = new LinkedHashMap<>(); - for (Map.Entry property : controllerProperties.entrySet()) { - runtimeArgs.put(property.getKey().toString(), property.getValue().toString()); - } - - return runtimeArgs; - } - -} diff --git a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessorTest.java b/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessorTest.java deleted file mode 100644 index a70f5ee..0000000 --- a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessorTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.junit.Test; -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; - -/** - * @author Rajiv Singla . Creation Date: 12/16/2016. - */ -public class AbstractTCAECEFPolicyProcessorTest extends BaseAnalyticsTCAUnitTest { - - private class DummyAbstractTCAECEFPolicyProcessor extends AbstractTCAECEFPolicyProcessor { - - @Override - public String getProcessorDescription() { - return "dummy"; - } - - @Override - public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { - return processorContext; - } - } - - - @Test(expected = MessageProcessingException.class) - public void preProcessorWhenThereIsNoCEFMessage() throws Exception { - DummyAbstractTCAECEFPolicyProcessor dummyAbstractTCAECEFPolicyProcessor = new - DummyAbstractTCAECEFPolicyProcessor(); - - final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(null, getSampleTCAPolicy()); - processorContext.setCEFEventListener(null); - dummyAbstractTCAECEFPolicyProcessor.preProcessor(processorContext); - } - -} diff --git a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessorTest.java b/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessorTest.java deleted file mode 100644 index 06fff5d..0000000 --- a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessorTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.junit.Test; -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; - -/** - * - * @author Rajiv Singla . Creation Date: 11/9/2016. - */ -public class TCACEFJsonProcessorTest extends BaseAnalyticsTCAUnitTest { - - - // A valid CEF Message - @Test - public void testCEFJsonProcessorWithValidCEFMessage() throws Exception { - - final String cefMessageString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); - final TCACEFProcessorContext tcacefProcessorContext = - new TCACEFProcessorContext(cefMessageString, getSampleTCAPolicy()); - - TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); - final TCACEFProcessorContext finalProcessorContext = tcacefJsonProcessor.apply(tcacefProcessorContext); - - final EventListener cefEventListener = finalProcessorContext.getCEFEventListener(); - - assertNotNull("CEF Event Listener must be present", cefEventListener); - - } - - // Even if message is not a valid CEF format but still a Json - Json Processor will parse it - @Test - public void testCEFJsonProcessorWithValidJson() throws Exception { - - final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext( - " { \"key\" : \"value\" } ", getSampleTCAPolicy()); - - TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); - final TCACEFProcessorContext finalProcessorContext = tcacefJsonProcessor.apply(tcacefProcessorContext); - final EventListener cefEventListener = finalProcessorContext.getCEFEventListener(); - - assertNotNull("Even if message is not a valid CEF format but a valid Json.Json Processor must be able to " + - "parse it", - cefEventListener); - } - - @Test(expected = MessageProcessingException.class) - public void testCEFJsonProcessorWithCEFMessageAsNull() throws Exception { - - final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(null, getSampleTCAPolicy()); - - TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); - tcacefJsonProcessor.apply(tcacefProcessorContext); - - } - - @Test - public void testCEFJsonProcessorWithCEFMessageIsBlank() throws Exception { - - final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(" ", getSampleTCAPolicy()); - - TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); - final TCACEFProcessorContext finalProcessorContext = tcacefJsonProcessor.apply(tcacefProcessorContext); - assertFalse("Blank message must terminate processing of message chain", finalProcessorContext - .canProcessingContinue()); - } - - - @Test - public void testCEFJsonProcessorWithCEFMessageWhichIsNotValidMessage() throws Exception { - - final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(" Invalid Message ", - getSampleTCAPolicy()); - - TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); - final TCACEFProcessorContext finalProcessorContext = tcacefJsonProcessor.apply(tcacefProcessorContext); - assertFalse("Invalid message must terminate processing of message chain", finalProcessorContext - .canProcessingContinue()); - } - - - @Test(expected = MessageProcessingException.class) - public void testCEFJsonProcessorWithCEFMessageWhichIsNotValidJson() throws Exception { - - final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext( - " { \"Invalid Event Listener Json\" } ", getSampleTCAPolicy()); - - TCACEFJsonProcessor tcacefJsonProcessor = new TCACEFJsonProcessor(); - tcacefJsonProcessor.apply(tcacefProcessorContext); - } - - -} diff --git a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilterTest.java b/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilterTest.java deleted file mode 100644 index 29c3bac..0000000 --- a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilterTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.junit.Before; -import org.junit.Test; -import org.openecomp.dcae.apod.analytics.common.service.processor.ProcessingState; -import org.openecomp.dcae.apod.analytics.model.domain.cef.Domain; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; - -/** - * @author Rajiv Singla . Creation Date: 12/16/2016. - */ -public class TCACEFPolicyDomainFilterTest extends BaseAnalyticsTCAUnitTest { - - private TCACEFPolicyDomainFilter tcacefPolicyDomainFilter; - private TCACEFProcessorContext processorContext; - private EventListener cefEventListener; - - @Before - public void before() throws Exception { - tcacefPolicyDomainFilter = new TCACEFPolicyDomainFilter(); - processorContext = new TCACEFProcessorContext("", getSampleTCAPolicy()); - cefEventListener = getCEFEventListener(); - processorContext.setCEFEventListener(cefEventListener); - } - - @Test - public void testProcessMessageWhenMessageIsValid() throws Exception { - tcacefPolicyDomainFilter.processMessage(processorContext); - assertThat("Processing must finish successfully", - tcacefPolicyDomainFilter.getProcessingState(), is(ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY)); - } - - @Test - public void testProcessMessageWhenCEFEventIsNull() throws Exception { - cefEventListener.setEvent(null); - processorContext.setCEFEventListener(cefEventListener); - tcacefPolicyDomainFilter.processMessage(processorContext); - assertThat("Processing must terminate early", - tcacefPolicyDomainFilter.getProcessingState(), is(ProcessingState.PROCESSING_TERMINATED_EARLY)); - } - - @Test - public void testProcessMessageWhenPolicyDomainDoesNotMatchMessageDomain() throws Exception { - cefEventListener.getEvent().getCommonEventHeader().setDomain(Domain.other); - tcacefPolicyDomainFilter.processMessage(processorContext); - assertThat("Processing must terminate early", - tcacefPolicyDomainFilter.getProcessingState(), is(ProcessingState.PROCESSING_TERMINATED_EARLY)); - } - -} diff --git a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilterTest.java b/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilterTest.java deleted file mode 100644 index 2a9da75..0000000 --- a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilterTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.junit.Before; -import org.junit.Test; -import org.openecomp.dcae.apod.analytics.common.service.processor.ProcessingState; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; - -/** - * @author Rajiv Singla . Creation Date: 12/19/2016. - */ -public class TCACEFPolicyEventNameFilterTest extends BaseAnalyticsTCAUnitTest { - - private TCACEFPolicyEventNameFilter tcacefPolicyEventNameFilter; - private TCACEFProcessorContext processorContext; - private EventListener cefEventListener; - - @Before - public void before() throws Exception { - tcacefPolicyEventNameFilter = new TCACEFPolicyEventNameFilter(); - processorContext = new TCACEFProcessorContext("", getSampleTCAPolicy()); - cefEventListener = getCEFEventListener(); - processorContext.setCEFEventListener(cefEventListener); - } - - @Test - public void testProcessMessageWhenMessageIsValid() throws Exception { - tcacefPolicyEventNameFilter.processMessage(processorContext); - assertThat("Processing must finish successfully", - tcacefPolicyEventNameFilter.getProcessingState(), - is(ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY)); - } - - @Test - public void testProcessMessageWhenCEFEventIsNull() throws Exception { - cefEventListener.setEvent(null); - processorContext.setCEFEventListener(cefEventListener); - tcacefPolicyEventNameFilter.processMessage(processorContext); - assertThat("Processing must terminate early", - tcacefPolicyEventNameFilter.getProcessingState(), is(ProcessingState.PROCESSING_TERMINATED_EARLY)); - } - - @Test - public void testProcessMessageWhenPolicyEventNameDoesNotMatchMessageEventName() throws Exception { - cefEventListener.getEvent().getCommonEventHeader().setEventName("someNonPolicyEventName"); - tcacefPolicyEventNameFilter.processMessage(processorContext); - assertThat("Processing must terminate early", - tcacefPolicyEventNameFilter.getProcessingState(), is(ProcessingState.PROCESSING_TERMINATED_EARLY)); - } - - -} diff --git a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessorTest.java b/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessorTest.java deleted file mode 100644 index 912fe6f..0000000 --- a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessorTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.junit.Test; -import org.openecomp.dcae.apod.analytics.common.service.processor.ProcessingState; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -/** - * - * @author Rajiv Singla . Creation Date: 11/9/2016. - */ -public class TCACEFPolicyThresholdsProcessorTest extends BaseAnalyticsTCAUnitTest { - - @Test - public void testCEFPolicyThresholdProcessorWithNoThresholdViolation() throws Exception { - - final String cefMessageString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); - final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(cefMessageString, - getSampleTCAPolicy()); - tcacefProcessorContext.setCEFEventListener(getCEFEventListener()); - - AbstractTCAECEFPolicyProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor(); - final TCACEFProcessorContext finalProcessorContext = policyThresholdsProcessor.apply(tcacefProcessorContext); - - assertFalse("Process Context can Processing Continue flag should be false", finalProcessorContext - .canProcessingContinue()); - assertThat("Policy Threshold Processor State must be terminated early", - policyThresholdsProcessor.getProcessingState(), is(ProcessingState.PROCESSING_TERMINATED_EARLY)); - assertEquals("Policy must not change", getSampleTCAPolicy(), finalProcessorContext.getTCAPolicy()); - - } - - @Test - public void testCEFPolicyThresholdProcessorWithThresholdViolation() throws Exception { - - final String cefMessageString = fromStream(CEF_MESSAGE_WITH_THRESHOLD_VIOLATION_JSON_FILE_LOCATION); - final TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(cefMessageString, - getSampleTCAPolicy()); - - final EventListener eventListener = ANALYTICS_MODEL_OBJECT_MAPPER.readValue(cefMessageString, - EventListener.class); - tcacefProcessorContext.setCEFEventListener(eventListener); - - AbstractTCAECEFPolicyProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor(); - final TCACEFProcessorContext finalProcessorContext = policyThresholdsProcessor.apply(tcacefProcessorContext); - - assertTrue("Process Context can Processing Continue flag should be true", finalProcessorContext - .canProcessingContinue()); - assertThat("Policy Threshold Processor State must be successful", - policyThresholdsProcessor.getProcessingState(), is(ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY)); - assertEquals("Policy must not change", getSampleTCAPolicy(), finalProcessorContext.getTCAPolicy()); - - } - -} diff --git a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContextTest.java b/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContextTest.java deleted file mode 100644 index 77b73cd..0000000 --- a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContextTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.junit.Test; -import org.openecomp.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; - -/** - * - * @author Rajiv Singla . Creation Date: 11/14/2016. - */ -public class TCACEFProcessorContextTest extends BaseAnalyticsTCAUnitTest { - - @Test - public void testProcessorContextSerialization() throws Exception { - TCACEFProcessorContext tcacefProcessorContext = new TCACEFProcessorContext(getValidCEFMessage(), - getSampleTCAPolicy()); - testSerialization(tcacefProcessorContext, TCACEFProcessorContextTest.class); - } -} diff --git a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtilsTest.java b/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtilsTest.java deleted file mode 100644 index c218de6..0000000 --- a/dcae-analytics-tca/src/test/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtilsTest.java +++ /dev/null @@ -1,418 +0,0 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.utils; - -import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Table; -import org.apache.commons.lang3.tuple.Pair; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.Mockito; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader; -import org.openecomp.dcae.apod.analytics.model.domain.cef.Domain; -import org.openecomp.dcae.apod.analytics.model.domain.cef.Event; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; -import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse; -import org.openecomp.dcae.apod.analytics.tca.BaseAnalyticsTCAUnitTest; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; -import org.quartz.Job; -import org.quartz.JobDataMap; -import org.quartz.JobDetail; -import org.quartz.Scheduler; -import org.quartz.SimpleTrigger; -import org.quartz.impl.StdSchedulerFactory; - -import java.math.BigDecimal; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.isA; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * @author Rajiv Singla . Creation Date: 11/9/2016. - */ -public class TCAUtilsTest extends BaseAnalyticsTCAUnitTest { - - @Test - public void testGetPolicyEventNames() throws Exception { - - final TCAPolicy sampleTCAPolicy = getSampleTCAPolicy(); - final List eventNames = TCAUtils.getPolicyEventNames(sampleTCAPolicy); - - assertThat("Policy event names must contain vFirewall, vLoadBalancer, virtualVMEventName", eventNames, - containsInAnyOrder("Mfvs_eNodeB_RANKPI", "vLoadBalancer", "virtualVMEventName")); - } - - @Test - public void testGetPolicyEventNamesSupplier() throws Exception { - final TCAPolicy sampleTCAPolicy = getSampleTCAPolicy(); - final Supplier> policyEventNamesSupplier = TCAUtils.getPolicyEventNamesSupplier - (sampleTCAPolicy); - final List eventNames = policyEventNamesSupplier.get(); - assertThat("Policy event names must contain vFirewall and vLoadBalancer", eventNames, - containsInAnyOrder("Mfvs_eNodeB_RANKPI", "vLoadBalancer", "virtualVMEventName")); - } - - @Test - public void testProcessCEFMessage() throws Exception { - final String cefMessageString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); - final TCACEFProcessorContext tcacefProcessorContext = TCAUtils.filterCEFMessage(cefMessageString, - getSampleTCAPolicy()); - assertThat("TCAECEFProcessor Processor Context can continue flag is true", tcacefProcessorContext - .canProcessingContinue(), is(true)); - } - - @Test - public void testGetPolicyFRThresholdsTableSupplier() throws Exception { - final Table> policyFRThresholdPathTable = TCAUtils - .getPolicyEventNameThresholdsTableSupplier(getSampleTCAPolicy()).get(); - - final Map> eNodeBRankpi = policyFRThresholdPathTable.row("Mfvs_eNodeB_RANKPI"); - final Map> vLoadBalancer = policyFRThresholdPathTable.row("vLoadBalancer"); - - final Set eNodeBRankpiFieldPaths = eNodeBRankpi.keySet(); - final Set vLoadBalancerPaths = vLoadBalancer.keySet(); - - final String receivedBroadcastPacketsFieldPath = - "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated"; - assertThat("eNodeBRankpi threshold field path size must be " + - "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*]" + - ".receivedBroadcastPacketsAccumulated", - eNodeBRankpiFieldPaths.iterator().next(), - is(receivedBroadcastPacketsFieldPath)); - - assertThat("vLoadBalancer threshold field path size must be " + - "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*]" + - ".receivedBroadcastPacketsAccumulated", - vLoadBalancerPaths.iterator().next(), - is(receivedBroadcastPacketsFieldPath)); - - final List eNodeBRankpiThresholds = policyFRThresholdPathTable.get("Mfvs_eNodeB_RANKPI", - receivedBroadcastPacketsFieldPath); - final List vLoadBalancerThresholds = policyFRThresholdPathTable.get("vLoadBalancer", - receivedBroadcastPacketsFieldPath); - - assertThat("eNodeBRankpi Threshold size must be 3", eNodeBRankpiThresholds.size(), is(3)); - assertThat("vLoadBalancer Threshold size must be 2", vLoadBalancerThresholds.size(), is(2)); - } - - @Test - public void testGetJsonPathValueWithValidMessageAndPolicy() throws Exception { - final String cefMessageString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); - final String jsonPath = - "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated"; - final ImmutableSet fieldPaths = ImmutableSet.of(jsonPath); - final Map> jsonPathValueMap = TCAUtils.getJsonPathValue(cefMessageString, fieldPaths); - assertThat("Json Path value must match", - jsonPathValueMap.get(jsonPath).get(0), is(new BigDecimal(5000))); - - } - - @Test - public void testGetJsonPathValueWithValidPath() throws Exception { - final String cefMessageString = fromStream(CEF_MESSAGE_JSON_FILE_LOCATION); - final String jsonPath = "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].invalid"; - final ImmutableSet fieldPaths = ImmutableSet.of(jsonPath); - final Map> jsonPathValueMap = TCAUtils.getJsonPathValue(cefMessageString, fieldPaths); - assertThat("Json path value must be empty", jsonPathValueMap.size(), is(0)); - - } - - - @Test - public void testCreateNewTCAVESResponseWithVFControlLoopSchemaType() throws Exception { - TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); - - MetricsPerEventName metricsPerEventName = mock(MetricsPerEventName.class); - when(metricsPerEventName.getThresholds()).thenReturn(getThresholds()); - when(metricsPerEventName.getPolicyScope()).thenReturn("Test Policy scope"); - when(tcacefProcessorContext.getMetricsPerEventName()).thenReturn(metricsPerEventName); - when(metricsPerEventName.getEventName()).thenReturn("testEventName"); - when(metricsPerEventName.getControlLoopSchemaType()).thenReturn(ControlLoopSchemaType.VM); - - when(tcacefProcessorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); - TCAVESResponse tcaVESResponse = TCAUtils.createNewTCAVESResponse(tcacefProcessorContext, "TCA_APP_NAME"); - - //TODO : Add proper assertions, as the usage is not clearly understood - assertThat(tcaVESResponse.getClosedLoopControlName(), - is("CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A")); - assertThat(tcaVESResponse.getVersion(), is("Test Version")); - assertThat(tcaVESResponse.getPolicyScope(), is("Test Policy scope")); - assertNull(tcaVESResponse.getAai().getGenericVNFName()); - assertNotNull(tcaVESResponse.getAai().getGenericServerName()); - } - - @Test - public void testCreateNewTCAVESResponseWithFunctionalRolevFirewall() throws Exception { - TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); - - MetricsPerEventName metricsPerEventName = mock(MetricsPerEventName.class); - when(metricsPerEventName.getThresholds()).thenReturn(getThresholds()); - when(metricsPerEventName.getPolicyScope()).thenReturn("Test Policy scope"); - when(tcacefProcessorContext.getMetricsPerEventName()).thenReturn(metricsPerEventName); - when(metricsPerEventName.getEventName()).thenReturn("vFirewall"); - - when(tcacefProcessorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); - TCAVESResponse tcaVESResponse = TCAUtils.createNewTCAVESResponse(tcacefProcessorContext, "TCA_APP_NAME"); - - //TODO : Add proper assertions, as the usage is not clearly understood - assertThat(tcaVESResponse.getClosedLoopControlName(), - is("CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A")); - assertThat(tcaVESResponse.getVersion(), is("Test Version")); - assertThat(tcaVESResponse.getPolicyScope(), is("Test Policy scope")); - assertNotNull(tcaVESResponse.getAai().getGenericVNFName()); - assertNull(tcaVESResponse.getAai().getGenericServerName()); - - } - - @Rule - public ExpectedException expectedIllegalArgumentException = ExpectedException.none(); - - @Test - public void testCreateNewTCAVESResponseNullFunctionalRole() throws Exception { - expectedIllegalArgumentException.expect(MessageProcessingException.class); - expectedIllegalArgumentException.expectCause(isA(IllegalArgumentException.class)); - expectedIllegalArgumentException.expectMessage("No violations metrics. Unable to create VES Response"); - - TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); - TCAVESResponse tcaVESResponse = TCAUtils.createNewTCAVESResponse(tcacefProcessorContext, "TCA_APP_NAME"); - assertNotNull(tcaVESResponse.getClosedLoopControlName()); - } - - @Test - public void testPrioritizeThresholdViolations() throws Exception { - - Map thresholdMap = new HashMap<>(); - Threshold majorThreshold = mock(Threshold.class); - when(majorThreshold.getSeverity()).thenReturn(EventSeverity.MAJOR); - thresholdMap.put("MAJOR", majorThreshold); - - Threshold result1 = TCAUtils.prioritizeThresholdViolations(thresholdMap); - assertEquals(result1.getSeverity(), EventSeverity.MAJOR); - - Threshold criticalThreshold = mock(Threshold.class); - when(criticalThreshold.getSeverity()).thenReturn(EventSeverity.CRITICAL); - thresholdMap.put("CRITICAL", criticalThreshold); - - Threshold result2 = TCAUtils.prioritizeThresholdViolations(thresholdMap); - assertEquals(result2.getSeverity(), EventSeverity.CRITICAL); - } - - @Test - public void testCreateViolatedMetrics() throws Exception { - TCAPolicy tcaPolicy = getSampleTCAPolicy(); - Threshold violatedThreshold = getCriticalThreshold(); - String functionalRole = "Mfvs_eNodeB_RANKPI"; - MetricsPerEventName result = TCAUtils.createViolatedMetrics(tcaPolicy, violatedThreshold, functionalRole); - assertThat(result.getPolicyScope(), is("resource=vFirewall;type=configuration")); - assertThat(result.getPolicyName(), is("configuration.dcae.microservice.tca.xml")); - } - - @Test - public void testCreateViolatedMetricsWrongEventName() throws Exception { - expectedIllegalArgumentException.expect(MessageProcessingException.class); - expectedIllegalArgumentException.expectCause(isA(IllegalStateException.class)); - String eventName = "badEventName"; - expectedIllegalArgumentException.expectMessage("TCA Policy must contain eventName: " + eventName); - TCAPolicy tcaPolicy = getSampleTCAPolicy(); - Threshold violatedThreshold = getCriticalThreshold(); - TCAUtils.createViolatedMetrics(tcaPolicy, violatedThreshold, eventName); - } - - @Test - public void testGetDomainAndEventName() { - TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); - EventListener eventListener = mock(EventListener.class); - Event event = mock(Event.class); - CommonEventHeader commonEventHeader = mock(CommonEventHeader.class); - - Pair result = TCAUtils.getDomainAndEventName(tcacefProcessorContext); - assertNull(result.getLeft()); - assertNull(result.getRight()); - - when(tcacefProcessorContext.getCEFEventListener()).thenReturn(eventListener); - result = TCAUtils.getDomainAndEventName(tcacefProcessorContext); - assertNull(result.getLeft()); - assertNull(result.getRight()); - - when(eventListener.getEvent()).thenReturn(event); - result = TCAUtils.getDomainAndEventName(tcacefProcessorContext); - assertNull(result.getLeft()); - assertNull(result.getRight()); - - when(event.getCommonEventHeader()).thenReturn(commonEventHeader); - result = TCAUtils.getDomainAndEventName(tcacefProcessorContext); - assertNull(result.getLeft()); - assertNull(result.getRight()); - - when(commonEventHeader.getDomain()).thenReturn(Domain.other); - when(commonEventHeader.getEventName()).thenReturn("eventName"); - - result = TCAUtils.getDomainAndEventName(tcacefProcessorContext); - assertEquals(result.getLeft(), "other"); - assertEquals(result.getRight(), "eventName"); - - } - - @Test - public void testComputeThresholdViolationsNotPresent() throws Exception { - TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); - when(tcacefProcessorContext.canProcessingContinue()).thenReturn(true); - when(tcacefProcessorContext.getMessage()).thenReturn(getValidCEFMessage()); - - when(tcacefProcessorContext.getTCAPolicy()).thenReturn(getSampleTCAPolicy()); - when(tcacefProcessorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); - - TCACEFProcessorContext result = TCAUtils.computeThresholdViolations(tcacefProcessorContext); - assertNotNull(result); - verify(result, times(0)).setMetricsPerEventName(Mockito.any(MetricsPerEventName.class)); - assertEquals("Policy must not change", getSampleTCAPolicy(), result.getTCAPolicy()); - } - - @Test - public void testComputeThresholdViolationsPresent() throws Exception { - TCACEFProcessorContext tcacefProcessorContext = mock(TCACEFProcessorContext.class); - when(tcacefProcessorContext.canProcessingContinue()).thenReturn(true); - final String cefMessageString = fromStream(CEF_MESSAGE_WITH_THRESHOLD_VIOLATION_JSON_FILE_LOCATION); - when(tcacefProcessorContext.getMessage()).thenReturn(cefMessageString); - - when(tcacefProcessorContext.getTCAPolicy()).thenReturn(getSampleTCAPolicy()); - when(tcacefProcessorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); - - TCACEFProcessorContext result = TCAUtils.computeThresholdViolations(tcacefProcessorContext); - verify(result, times(1)).setMetricsPerEventName(Mockito.any(MetricsPerEventName.class)); - - assertEquals("Policy must not change", getSampleTCAPolicy(), result.getTCAPolicy()); - } - - - @Test - public void testCreateTCAPolicyMetricsPerKeyName() throws Exception { - - final Map tcaPolicyMap = TCAUtils.filterMapByKeyNamePrefix(getControllerRuntimeArguments(), - AnalyticsConstants.TCA_POLICY_METRICS_PER_FUNCTIONAL_ROLE_PATH); - - // determine functional Roles - final Map> functionalRolesMap = - TCAUtils.extractSubTree(tcaPolicyMap, 2, 3, AnalyticsConstants.TCA_POLICY_DELIMITER); - - final List tcaPolicyMetricsPerEventNameList = - TCAUtils.createTCAPolicyMetricsPerEventNameList(functionalRolesMap); - - assertThat("There are two Metrics per function role", 2, - is(tcaPolicyMetricsPerEventNameList.size())); - } - - - @Test - public void testCreateQuartzScheduler() throws Exception { - final Scheduler scheduler = Mockito.mock(Scheduler.class); - final StdSchedulerFactory stdSchedulerFactory = Mockito.mock(StdSchedulerFactory.class); - when(stdSchedulerFactory.getScheduler()).thenReturn(scheduler); - final JobDataMap jobDataMap = Mockito.mock(JobDataMap.class); - TCAUtils.createQuartzScheduler(1000, stdSchedulerFactory, - "data/properties/quartz-test.properties", jobDataMap, Job.class, - "testJob", "testTigger"); - verify(scheduler, times(1)) - .scheduleJob(Mockito.any(JobDetail.class), Mockito.any(SimpleTrigger.class)); - } - - - @Test - public void testCreateTCAAlertStringWhenCEFIsEnabled() throws Exception { - final MetricsPerEventName violatedMetrics = createViolatedMetricsPerEventName(EventSeverity.CRITICAL); - TCACEFProcessorContext processorContext = mock(TCACEFProcessorContext.class); - when(processorContext.getMetricsPerEventName()).thenReturn(violatedMetrics); - when(processorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); - final String alertString = TCAUtils.createTCAAlertString(processorContext, "testApp", true); - assertTrue(alertString.contains("thresholdCrossingAlertFields")); - } - - @Test(expected = MessageProcessingException.class) - public void testCreateTCAAlertStringWhenViolatedMetricsNotPresentAndCEFIsEnabled() throws Exception { - TCACEFProcessorContext processorContext = mock(TCACEFProcessorContext.class); - when(processorContext.getMetricsPerEventName()).thenReturn(null); - TCAUtils.createTCAAlertString(processorContext, "testApp", true); - } - - @Test - public void testCreateTCAAlertStringWhenCEFIsDisabled() throws Exception { - final MetricsPerEventName violatedMetrics = createViolatedMetricsPerEventName(EventSeverity.MAJOR); - TCACEFProcessorContext processorContext = mock(TCACEFProcessorContext.class); - when(processorContext.getMetricsPerEventName()).thenReturn(violatedMetrics); - when(processorContext.getCEFEventListener()).thenReturn(getCEFEventListener()); - final String alertString = TCAUtils.createTCAAlertString(processorContext, "testApp", false); - assertFalse(alertString.contains("thresholdCrossingAlertFields")); - } - - @Test(expected = MessageProcessingException.class) - public void testCreateTCAAlertStringWhenViolatedMetricsNotPresentAndCEFIsDisabled() throws Exception { - TCACEFProcessorContext processorContext = mock(TCACEFProcessorContext.class); - when(processorContext.getMetricsPerEventName()).thenReturn(null); - TCAUtils.createTCAAlertString(processorContext, "testApp", false); - } - - private static MetricsPerEventName createViolatedMetricsPerEventName(EventSeverity severity) { - final Threshold violatedThreshold = new Threshold(); - violatedThreshold.setSeverity(severity); - violatedThreshold.setDirection(Direction.GREATER); - violatedThreshold.setClosedLoopControlName("violatedThresholdClosedLoopName"); - violatedThreshold.setActualFieldValue(new BigDecimal(100L)); - violatedThreshold.setFieldPath("violatedThresholdFieldPath"); - violatedThreshold.setVersion("violatedThresholdVersion"); - violatedThreshold.setClosedLoopEventStatus(ClosedLoopEventStatus.ONSET); - violatedThreshold.setThresholdValue(50L); - - final MetricsPerEventName violatedMetrics = new MetricsPerEventName(); - violatedMetrics.setPolicyName("violatePolicyName"); - violatedMetrics.setPolicyVersion("violatedPolicyVersion"); - violatedMetrics.setPolicyScope("violatedPolicyScope"); - violatedMetrics.setEventName("violatedEventName"); - violatedMetrics.setThresholds(Arrays.asList(violatedThreshold)); - return violatedMetrics; - } -} diff --git a/dcae-analytics-tca/src/test/resources/logback-test.xml b/dcae-analytics-tca/src/test/resources/logback-test.xml index 4857522..78cbdfa 100644 --- a/dcae-analytics-tca/src/test/resources/logback-test.xml +++ b/dcae-analytics-tca/src/test/resources/logback-test.xml @@ -1,55 +1,55 @@ - - - - - - - - - - - - - - - - - - - - - - - - - %d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + %d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n + + + + + + + + + + -- cgit 1.2.3-korg