summaryrefslogtreecommitdiffstats
path: root/dcae-analytics/dcae-analytics-tca-core/src/main/java/org/onap/dcae/analytics/tca/core/util/function/calculation/TcaThresholdViolationCalculator.java
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics/dcae-analytics-tca-core/src/main/java/org/onap/dcae/analytics/tca/core/util/function/calculation/TcaThresholdViolationCalculator.java')
-rw-r--r--dcae-analytics/dcae-analytics-tca-core/src/main/java/org/onap/dcae/analytics/tca/core/util/function/calculation/TcaThresholdViolationCalculator.java270
1 files changed, 270 insertions, 0 deletions
diff --git a/dcae-analytics/dcae-analytics-tca-core/src/main/java/org/onap/dcae/analytics/tca/core/util/function/calculation/TcaThresholdViolationCalculator.java b/dcae-analytics/dcae-analytics-tca-core/src/main/java/org/onap/dcae/analytics/tca/core/util/function/calculation/TcaThresholdViolationCalculator.java
new file mode 100644
index 0000000..053fcab
--- /dev/null
+++ b/dcae-analytics/dcae-analytics-tca-core/src/main/java/org/onap/dcae/analytics/tca/core/util/function/calculation/TcaThresholdViolationCalculator.java
@@ -0,0 +1,270 @@
+/*
+ * ================================================================================
+ * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ */
+
+package org.onap.dcae.analytics.tca.core.util.function.calculation;
+
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.TypeRef;
+
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nonnull;
+
+import org.onap.dcae.analytics.model.cef.CommonEventHeader;
+import org.onap.dcae.analytics.model.cef.Event;
+import org.onap.dcae.analytics.model.cef.EventListener;
+import org.onap.dcae.analytics.tca.core.exception.TcaProcessingException;
+import org.onap.dcae.analytics.tca.core.service.TcaExecutionContext;
+import org.onap.dcae.analytics.tca.core.service.TcaResultContext;
+import org.onap.dcae.analytics.tca.model.policy.MetricsPerEventName;
+import org.onap.dcae.analytics.tca.model.policy.TcaPolicy;
+import org.onap.dcae.analytics.tca.model.policy.Threshold;
+
+/**
+ * @author Rajiv Singla
+ */
+public class TcaThresholdViolationCalculator implements TcaCalculationFunction {
+
+ @Override
+ public TcaExecutionContext calculate(final TcaExecutionContext tcaExecutionContext) {
+
+ final String cefMessage = tcaExecutionContext.getCefMessage();
+ final EventListener eventListener = tcaExecutionContext.getTcaProcessingContext().getEventListener();
+ final TcaPolicy tcaPolicy = tcaExecutionContext.getTcaPolicy();
+
+ // Get CEF Event Name
+ final String cefEventName = Optional.ofNullable(eventListener)
+ .map(EventListener::getEvent)
+ .map(Event::getCommonEventHeader)
+ .map(CommonEventHeader::getEventName)
+ .orElseThrow(() -> new TcaProcessingException("Required Field: EventName not present"));
+
+ // Get Policy's metrics per event name matching CEF message event name
+ final MetricsPerEventName policyMetricsPerEventName =
+ tcaPolicy.getMetricsPerEventName().stream()
+ .filter(m -> m.getEventName().equalsIgnoreCase(cefEventName))
+ .findFirst().orElseThrow(() ->
+ new TcaProcessingException("Required Field: MetricsPerEventName not present"));
+
+
+ // get violated policy threshold for cef event name sorted by severity
+ final Optional<Threshold> thresholdOptional =
+ getViolatedThreshold(policyMetricsPerEventName.getThresholds(), cefMessage);
+
+
+ // Check if threshold violation is present
+ if (!thresholdOptional.isPresent()) {
+ final String earlyTerminationMessage = "No Policy Threshold violation detected in CEF Message";
+ setTerminatingMessage(earlyTerminationMessage, tcaExecutionContext, false);
+ return tcaExecutionContext;
+ }
+
+
+ // Threshold violations are present - update tca processing result context
+ final MetricsPerEventName violatedMetricsPerEventName = copyMetricsPerEventName(policyMetricsPerEventName);
+ final Threshold violatedThreshold = thresholdOptional.get();
+ violatedMetricsPerEventName.setThresholds(Collections.singletonList(violatedThreshold));
+ final TcaResultContext tcaResultContext =
+ tcaExecutionContext.getTcaResultContext();
+ tcaResultContext.setViolatedMetricsPerEventName(violatedMetricsPerEventName);
+
+ return tcaExecutionContext;
+ }
+
+
+ /**
+ * Provides violated threshold
+ *
+ * @param policyThresholds policy thresholds that need to applied to incoming cef message
+ * @param cefMessage incoming cef message
+ *
+ * @return list of all violated threshold
+ */
+ private static Optional<Threshold> getViolatedThreshold(final List<Threshold> policyThresholds,
+ final String cefMessage) {
+
+ // map containing key as field path and associated policy thresholds
+ final Map<String, List<Threshold>> policyFieldPathsMap = new LinkedHashMap<>();
+ for (final Threshold policyThreshold : policyThresholds) {
+ if (policyFieldPathsMap.get(policyThreshold.getFieldPath()) == null) {
+ final LinkedList<Threshold> policyThresholdList = new LinkedList<>();
+ policyThresholdList.add(policyThreshold);
+ policyFieldPathsMap.put(policyThreshold.getFieldPath(), policyThresholdList);
+ } else {
+ policyFieldPathsMap.get(policyThreshold.getFieldPath()).add(policyThreshold);
+ }
+ }
+
+ // get map containing key as field path and values for json path
+ final Map<String, List<BigDecimal>> messageFieldValuesMap =
+ getJsonPathValues(cefMessage, policyFieldPathsMap.keySet());
+
+ // if no matching path values - assuming no threshold violations
+ if (messageFieldValuesMap.isEmpty()) {
+ return Optional.empty();
+ }
+
+ // Determine all violated thresholds per message field Path
+ final Map<String, Threshold> violatedThresholdsMap = new HashMap<>();
+ for (Map.Entry<String, List<BigDecimal>> messageFieldValuesMapEntry : messageFieldValuesMap.entrySet()) {
+ final String messageFieldPath = messageFieldValuesMapEntry.getKey();
+ final List<Threshold> messageFieldAssociatedPolicyThresholds = policyFieldPathsMap.get(messageFieldPath);
+ if (messageFieldAssociatedPolicyThresholds != null) {
+ final Optional<Threshold> thresholdOptional = computeViolatedThreshold(
+ messageFieldValuesMapEntry.getValue(), messageFieldAssociatedPolicyThresholds);
+ thresholdOptional.ifPresent(threshold -> violatedThresholdsMap.put(messageFieldPath, threshold));
+ }
+ }
+
+ // if multiple fields have violated threshold - grab the first one with max severity
+ return violatedThresholdsMap.values().stream()
+ .sorted(Comparator.comparing(Threshold::getSeverity)).findFirst();
+
+ }
+
+ /**
+ * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
+ * it applies threshold in order of their severity and then by direction and returns first violated threshold
+ *
+ * @param messageFieldValues Field Path Values extracted from CEF Message
+ * @param fieldThresholds Policy Thresholds for Field Path
+ *
+ * @return Optional of violated threshold for a field path
+ */
+ private static Optional<Threshold> computeViolatedThreshold(final List<BigDecimal> messageFieldValues,
+ final List<Threshold> fieldThresholds) {
+
+ // sort thresholds based on severity and then based on direction
+ final List<Threshold> sortedPolicyThresholds = fieldThresholds.stream()
+ .sorted((t1, t2) -> {
+ if (t1.getSeverity().compareTo(t2.getSeverity()) != 0) {
+ return t1.getSeverity().compareTo(t2.getSeverity());
+ } else {
+ return t1.getDirection().compareTo(t2.getDirection());
+ }
+ })
+ .collect(Collectors.toList());
+
+ // Now apply each threshold to field values
+ for (final Threshold policyThreshold : sortedPolicyThresholds) {
+ for (final BigDecimal messageFieldValue : messageFieldValues) {
+ final Boolean isThresholdViolated =
+ policyThreshold.getDirection().operate(messageFieldValue,
+ new BigDecimal(policyThreshold.getThresholdValue()));
+ if (isThresholdViolated) {
+ final Threshold violatedThreshold = copyThreshold(policyThreshold);
+ violatedThreshold.setActualFieldValue(messageFieldValue);
+ return Optional.of(violatedThreshold);
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Extracts json path values for given json Field Path from using Json path notation.
+ *
+ * @param message CEF Message
+ * @param jsonFieldPaths Json Field Paths
+ *
+ * @return Map containing key as json path and values as values associated with that json path
+ */
+ private static Map<String, List<BigDecimal>> getJsonPathValues(@Nonnull String message,
+ @Nonnull Set<String> jsonFieldPaths) {
+
+ final Map<String, List<BigDecimal>> jsonFieldPathMap = new HashMap<>();
+ final DocumentContext documentContext = JsonPath.parse(message);
+
+ for (String jsonFieldPath : jsonFieldPaths) {
+ List<BigDecimal> jsonFieldValues;
+
+ try {
+ jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<BigDecimal>>() {
+ });
+ } catch (Exception e) {
+ final String errorMessage = String.format(
+ "Unable to convert jsonFieldPath value to valid number." +
+ "Json Path: %s.Incoming message: %s", jsonFieldPath, message);
+ throw new TcaProcessingException(errorMessage, e);
+ }
+ // If Json Field Values are present
+ if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
+ // Filter out all null values in the filed values list
+ final List<BigDecimal> nonNullValues =
+ jsonFieldValues.stream().filter(Objects::nonNull).collect(Collectors.toList());
+ // If there are non null values put them in the map
+ if (!nonNullValues.isEmpty()) {
+ jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
+ }
+ }
+ }
+
+ return jsonFieldPathMap;
+ }
+
+ /**
+ * Creates a new threshold hold object by copying the value of given threshold object
+ *
+ * @param threshold threshold that needs to be copied
+ *
+ * @return new threshold object which is copy of given threshold object
+ */
+ private static Threshold copyThreshold(final Threshold threshold) {
+ final Threshold copyThreshold = new Threshold();
+ copyThreshold.setClosedLoopControlName(threshold.getClosedLoopControlName());
+ copyThreshold.setClosedLoopEventStatus(threshold.getClosedLoopEventStatus());
+ copyThreshold.setVersion(threshold.getVersion());
+ copyThreshold.setFieldPath(threshold.getFieldPath());
+ copyThreshold.setThresholdValue(threshold.getThresholdValue());
+ copyThreshold.setDirection(threshold.getDirection());
+ copyThreshold.setSeverity(threshold.getSeverity());
+ return copyThreshold;
+ }
+
+ /**
+ * Returns a copy of metric Per event name without copying thresholds
+ *
+ * @param metricsPerEventName metric per event name that needs to be copied
+ *
+ * @return new metric per event name object which is copy of given object
+ */
+ private static MetricsPerEventName copyMetricsPerEventName(final MetricsPerEventName metricsPerEventName) {
+ final MetricsPerEventName copyMetricsPerEventName = new MetricsPerEventName();
+ copyMetricsPerEventName.setEventName(metricsPerEventName.getEventName());
+ copyMetricsPerEventName.setControlLoopSchemaType(metricsPerEventName.getControlLoopSchemaType());
+ copyMetricsPerEventName.setPolicyScope(metricsPerEventName.getPolicyScope());
+ copyMetricsPerEventName.setPolicyName(metricsPerEventName.getPolicyName());
+ copyMetricsPerEventName.setPolicyVersion(metricsPerEventName.getPolicyVersion());
+ // no thresholds copied
+ return copyMetricsPerEventName;
+ }
+
+}