diff options
author | an4828 <nekrassov@att.com> | 2017-09-26 14:35:17 -0400 |
---|---|---|
committer | an4828 <nekrassov@att.com> | 2017-09-26 14:35:24 -0400 |
commit | 06044df56fb07f4b368888581752855595e7b147 (patch) | |
tree | 6400a3a6ede762887861a621b7fdbfadd25190d5 /dcae-analytics-tca/src/main | |
parent | 475cb8c867038acd73ff540173d54bac3947c610 (diff) |
TCA: Support for VES/A&AI enrichment
Change-Id: I75e0f8e034b9334e918304739e4d73dd12c1ff62
Signed-off-by: an4828 <nekrassov@att.com>
Issue-ID: DCAEGEN2-116
Diffstat (limited to 'dcae-analytics-tca/src/main')
7 files changed, 1575 insertions, 1357 deletions
diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java index 6cb8e7f..1540896 100644 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java +++ b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/AbstractTCAECEFPolicyProcessor.java @@ -1,61 +1,61 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; - -/** - * <p> - * Encapsulates common functionality for all TCA CEF Policy Processors - * </p> - * - * @author Rajiv Singla . Creation Date: 11/9/2016. - */ -public abstract class AbstractTCAECEFPolicyProcessor extends AbstractMessageProcessor<TCACEFProcessorContext> { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractTCAECEFPolicyProcessor.class); - - /** - * For all TCA Policy Processor the pre processor ensures that {@link EventListener} object is - * present - * - * @param processorContext incoming Processor Context - * @return Pre processed Processor Context - */ - @Override - public TCACEFProcessorContext preProcessor(@Nonnull TCACEFProcessorContext processorContext) { - // validates CEF Event Listener is Present - final EventListener cefEventListener = processorContext.getCEFEventListener(); - if (cefEventListener == null) { - final String errorMessage = String.format( - "CEF Event Listener is not Present.Invalid use of Processor: %s. CEF Message: %s", - getProcessorInfo().getProcessorName(), processorContext.getMessage()); - throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - return super.preProcessor(processorContext); - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.tca.processor;
+
+import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
+import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * <p>
+ * Encapsulates common functionality for all TCA CEF Policy Processors
+ * </p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/9/2016.
+ */
+public abstract class AbstractTCAECEFPolicyProcessor extends AbstractMessageProcessor<TCACEFProcessorContext> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractTCAECEFPolicyProcessor.class);
+
+ /**
+ * For all TCA Policy Processor the pre processor ensures that {@link EventListener} object is
+ * present
+ *
+ * @param processorContext incoming Processor Context
+ * @return Pre processed Processor Context
+ */
+ @Override
+ public TCACEFProcessorContext preProcessor(@Nonnull TCACEFProcessorContext processorContext) {
+ // validates CEF Event Listener is Present
+ final EventListener cefEventListener = processorContext.getCEFEventListener();
+ if (cefEventListener == null) {
+ final String errorMessage = String.format(
+ "CEF Event Listener is not Present.Invalid use of Processor: %s. CEF Message: %s",
+ getProcessorInfo().getProcessorName(), processorContext.getMessage());
+ throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ return super.preProcessor(processorContext);
+ }
+}
diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java index 944eba6..943270e 100644 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java +++ b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFJsonProcessor.java @@ -1,98 +1,98 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.apache.commons.lang3.StringUtils; -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - *<p> - * Processor that converts incoming presumed JSON string CEF message to {@link EventListener} object - * <br> - * Pre Conditions: None - *</p> - * - * @author Rajiv Singla . Creation Date: 11/5/2016. - */ -public class TCACEFJsonProcessor extends AbstractMessageProcessor<TCACEFProcessorContext> { - - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(TCACEFJsonProcessor.class); - - - @Override - public String getProcessorDescription() { - return "Converts incoming TCA CEF Message to Event Listener object"; - } - - @Override - public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { - - final String cefMessage = processorContext.getMessage(); - - // If CEF Message is null then processor should stop processing - if (cefMessage == null) { - String errorMessage = "Null CEF message cannot be converted to CEF Event Listener Object"; - throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - - // If CEF Message is blank then processor stop processing - if (StringUtils.isBlank(cefMessage)) { - setTerminatingProcessingMessage("Blank CEF message cannot be converted to CEF Event Listener Object", - processorContext); - return processorContext; - } - - // trim cef message - final String trimmedCEFMessage = cefMessage.trim(); - - // if message does not start with curly brace and ends with curly brace, it is not a valid cef message - // processor will stop processing - if (!(trimmedCEFMessage.startsWith("{") && trimmedCEFMessage.endsWith("}"))) { - setTerminatingProcessingMessage("CEF Message must start with curly brace and must end with curly brace", - processorContext); - return processorContext; - } - - // try parsing the cef message - try { - final EventListener eventListener = TCAUtils.readValue(trimmedCEFMessage, EventListener.class); - setFinishedProcessingMessage("CEF JSON to Event Listener Conversion Successful", processorContext); - // set new Event Listener in the Processor Context - processorContext.setCEFEventListener(eventListener); - return processorContext; - } catch (IOException e) { - final String errorMessage = String.format("Parsing Failed for CEF Message: %s, Error: %s", cefMessage, e); - // If parsing fails throw an exception - throw new MessageProcessingException(errorMessage, LOG, e); - } - - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.tca.processor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
+import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ *<p>
+ * Processor that converts incoming presumed JSON string CEF message to {@link EventListener} object
+ * <br>
+ * Pre Conditions: None
+ *</p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/5/2016.
+ */
+public class TCACEFJsonProcessor extends AbstractMessageProcessor<TCACEFProcessorContext> {
+
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCACEFJsonProcessor.class);
+
+
+ @Override
+ public String getProcessorDescription() {
+ return "Converts incoming TCA CEF Message to Event Listener object";
+ }
+
+ @Override
+ public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) {
+
+ final String cefMessage = processorContext.getMessage();
+
+ // If CEF Message is null then processor should stop processing
+ if (cefMessage == null) {
+ String errorMessage = "Null CEF message cannot be converted to CEF Event Listener Object";
+ throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+
+ // If CEF Message is blank then processor stop processing
+ if (StringUtils.isBlank(cefMessage)) {
+ setTerminatingProcessingMessage("Blank CEF message cannot be converted to CEF Event Listener Object",
+ processorContext);
+ return processorContext;
+ }
+
+ // trim cef message
+ final String trimmedCEFMessage = cefMessage.trim();
+
+ // if message does not start with curly brace and ends with curly brace, it is not a valid cef message
+ // processor will stop processing
+ if (!(trimmedCEFMessage.startsWith("{") && trimmedCEFMessage.endsWith("}"))) {
+ setTerminatingProcessingMessage("CEF Message must start with curly brace and must end with curly brace",
+ processorContext);
+ return processorContext;
+ }
+
+ // try parsing the cef message
+ try {
+ final EventListener eventListener = TCAUtils.readValue(trimmedCEFMessage, EventListener.class);
+ setFinishedProcessingMessage("CEF JSON to Event Listener Conversion Successful", processorContext);
+ // set new Event Listener in the Processor Context
+ processorContext.setCEFEventListener(eventListener);
+ return processorContext;
+ } catch (IOException e) {
+ final String errorMessage = String.format("Parsing Failed for CEF Message: %s, Error: %s", cefMessage, e);
+ // If parsing fails throw an exception
+ throw new MessageProcessingException(errorMessage, LOG, e);
+ }
+
+ }
+}
diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java index cd29ed1..3819d2c 100644 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java +++ b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyDomainFilter.java @@ -1,84 +1,84 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.openecomp.dcae.apod.analytics.model.domain.cef.Domain; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; - -/** - * <p> - * TCA Processor which acts like a filter to filter out messages which does not belong to TCA Policy Domain - * <br> - * Pre Conditions: CEF Event Listener must be present - * </p> - * - * @author Rajiv Singla . Creation Date: 11/7/2016. - */ -public class TCACEFPolicyDomainFilter extends AbstractTCAECEFPolicyProcessor { - - - private static final long serialVersionUID = 1L; - - @Override - public String getProcessorDescription() { - return "Filters out CEF Messages which does not match TCAPolicy Domain"; - } - - @Override - public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { - - // Safe to get event Listener here without null check as pre processor will validate if - // event listener is indeed present - final EventListener eventListener = processorContext.getCEFEventListener(); - - Domain cefMessageDomain; - - // Extract CEF domain as it is must be present as per CEF Schema - if (eventListener.getEvent() != null && - eventListener.getEvent().getCommonEventHeader() != null && - eventListener.getEvent().getCommonEventHeader().getDomain() != null) { - cefMessageDomain = eventListener.getEvent().getCommonEventHeader().getDomain(); - - } else { - final String terminatingMessage = "Invalid CEF Message.Common Event Header Domain not present."; - setTerminatingProcessingMessage(terminatingMessage, processorContext); - return processorContext; - } - - // Get Policy Domain. TCA Policy Validation must ensure that Domain is indeed present - // no null check will be required here - final String policyDomain = processorContext.getTCAPolicy().getDomain(); - - // If Policy domain matches CEF message domain then continue processing - if (cefMessageDomain.toString().equalsIgnoreCase(policyDomain)) { - final String finishMessage = String.format("Policy Domain and CEF Message Domain match successful." + - " Message Domain: %s, Policy Domain: %s", cefMessageDomain, policyDomain); - setFinishedProcessingMessage(finishMessage, processorContext); - } else { - // If policy domain does not match with CEF message terminate processing chain - final String terminatingMessage = String.format("Policy Domain and CEF Message Domain match unsuccessful." + - " Message Domain: %s, Policy Domain: %s", cefMessageDomain, policyDomain); - setTerminatingProcessingMessage(terminatingMessage, processorContext); - } - - return processorContext; - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.tca.processor;
+
+import org.openecomp.dcae.apod.analytics.model.domain.cef.Domain;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+
+/**
+ * <p>
+ * TCA Processor which acts like a filter to filter out messages which does not belong to TCA Policy Domain
+ * <br>
+ * Pre Conditions: CEF Event Listener must be present
+ * </p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/7/2016.
+ */
+public class TCACEFPolicyDomainFilter extends AbstractTCAECEFPolicyProcessor {
+
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getProcessorDescription() {
+ return "Filters out CEF Messages which does not match TCAPolicy Domain";
+ }
+
+ @Override
+ public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) {
+
+ // Safe to get event Listener here without null check as pre processor will validate if
+ // event listener is indeed present
+ final EventListener eventListener = processorContext.getCEFEventListener();
+
+ Domain cefMessageDomain;
+
+ // Extract CEF domain as it is must be present as per CEF Schema
+ if (eventListener.getEvent() != null &&
+ eventListener.getEvent().getCommonEventHeader() != null &&
+ eventListener.getEvent().getCommonEventHeader().getDomain() != null) {
+ cefMessageDomain = eventListener.getEvent().getCommonEventHeader().getDomain();
+
+ } else {
+ final String terminatingMessage = "Invalid CEF Message.Common Event Header Domain not present.";
+ setTerminatingProcessingMessage(terminatingMessage, processorContext);
+ return processorContext;
+ }
+
+ // Get Policy Domain. TCA Policy Validation must ensure that Domain is indeed present
+ // no null check will be required here
+ final String policyDomain = processorContext.getTCAPolicy().getDomain();
+
+ // If Policy domain matches CEF message domain then continue processing
+ if (cefMessageDomain.toString().equalsIgnoreCase(policyDomain)) {
+ final String finishMessage = String.format("Policy Domain and CEF Message Domain match successful." +
+ " Message Domain: %s, Policy Domain: %s", cefMessageDomain, policyDomain);
+ setFinishedProcessingMessage(finishMessage, processorContext);
+ } else {
+ // If policy domain does not match with CEF message terminate processing chain
+ final String terminatingMessage = String.format("Policy Domain and CEF Message Domain match unsuccessful." +
+ " Message Domain: %s, Policy Domain: %s", cefMessageDomain, policyDomain);
+ setTerminatingProcessingMessage(terminatingMessage, processorContext);
+ }
+
+ return processorContext;
+ }
+}
diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java index 689b06c..50bd92b 100644 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java +++ b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyEventNameFilter.java @@ -1,91 +1,91 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import com.google.common.base.Joiner; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; - -import java.util.List; - -import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.getPolicyEventNamesSupplier; - -/** - * <p> - * TCA Processor that acts like a filter to filter out messages which does not belong to TCA Policy Event Name - * <br> - * Pre Conditions: CEF Event Listener must be present - * </p> - * - * @author Rajiv Singla . Creation Date: 11/9/2016. - */ -public class TCACEFPolicyEventNameFilter extends AbstractTCAECEFPolicyProcessor { - - private static final long serialVersionUID = 1L; - - @Override - public String getProcessorDescription() { - return "Filters out CEF Messages which does not match Policy Functional Roles"; - } - - @Override - public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { - - // Safe to get event Listener here without null check as pre processor will validate if - // event listener is indeed present - final EventListener eventListener = processorContext.getCEFEventListener(); - - String cefMessageEventName; - - if (eventListener.getEvent() != null && - eventListener.getEvent().getCommonEventHeader() != null && - eventListener.getEvent().getCommonEventHeader().getEventName() != null) { - cefMessageEventName = eventListener.getEvent().getCommonEventHeader().getEventName(); - } else { - String terminationMessage = "Invalid CEF Message.Common Event Header Event Name not present."; - setTerminatingProcessingMessage(terminationMessage, processorContext); - return processorContext; - } - - // Determine Policy Functional Roles - final TCAPolicy tcaPolicy = processorContext.getTCAPolicy(); - final List<String> policyFunctionalRoles = getPolicyEventNamesSupplier(tcaPolicy).get(); - final String policyFunctionalRolesString = Joiner.on(",").join(policyFunctionalRoles); - - // If Policy functional Roles contains CEF message Functional Role then continue processing - if (policyFunctionalRoles.contains(cefMessageEventName)) { - final String finishMessage = String.format( - "Policy Functional Roles and CEF Message Functional match successful." + - "Message Functional Role: %s, Policy Functional Roles: %s", - cefMessageEventName, policyFunctionalRolesString); - setFinishedProcessingMessage(finishMessage, processorContext); - } else { - // If Policy functional Roles does not contain CEF message Functiona Role then terminate processing - final String terminatingMessage = String.format( - "Policy Domain and CEF Message Domain match unsuccessful." + - "Message Functional Role: %s, Policy Functional Roles: %s", - cefMessageEventName, policyFunctionalRolesString); - setTerminatingProcessingMessage(terminatingMessage, processorContext); - } - - return processorContext; - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.tca.processor;
+
+import com.google.common.base.Joiner;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
+
+import java.util.List;
+
+import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.getPolicyEventNamesSupplier;
+
+/**
+ * <p>
+ * TCA Processor that acts like a filter to filter out messages which does not belong to TCA Policy Event Name
+ * <br>
+ * Pre Conditions: CEF Event Listener must be present
+ * </p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/9/2016.
+ */
+public class TCACEFPolicyEventNameFilter extends AbstractTCAECEFPolicyProcessor {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getProcessorDescription() {
+ return "Filters out CEF Messages which does not match Policy Functional Roles";
+ }
+
+ @Override
+ public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) {
+
+ // Safe to get event Listener here without null check as pre processor will validate if
+ // event listener is indeed present
+ final EventListener eventListener = processorContext.getCEFEventListener();
+
+ String cefMessageEventName;
+
+ if (eventListener.getEvent() != null &&
+ eventListener.getEvent().getCommonEventHeader() != null &&
+ eventListener.getEvent().getCommonEventHeader().getEventName() != null) {
+ cefMessageEventName = eventListener.getEvent().getCommonEventHeader().getEventName();
+ } else {
+ String terminationMessage = "Invalid CEF Message.Common Event Header Event Name not present.";
+ setTerminatingProcessingMessage(terminationMessage, processorContext);
+ return processorContext;
+ }
+
+ // Determine Policy Functional Roles
+ final TCAPolicy tcaPolicy = processorContext.getTCAPolicy();
+ final List<String> policyFunctionalRoles = getPolicyEventNamesSupplier(tcaPolicy).get();
+ final String policyFunctionalRolesString = Joiner.on(",").join(policyFunctionalRoles);
+
+ // If Policy functional Roles contains CEF message Functional Role then continue processing
+ if (policyFunctionalRoles.contains(cefMessageEventName)) {
+ final String finishMessage = String.format(
+ "Policy Functional Roles and CEF Message Functional match successful." +
+ "Message Functional Role: %s, Policy Functional Roles: %s",
+ cefMessageEventName, policyFunctionalRolesString);
+ setFinishedProcessingMessage(finishMessage, processorContext);
+ } else {
+ // If Policy functional Roles does not contain CEF message Functiona Role then terminate processing
+ final String terminatingMessage = String.format(
+ "Policy Domain and CEF Message Domain match unsuccessful." +
+ "Message Functional Role: %s, Policy Functional Roles: %s",
+ cefMessageEventName, policyFunctionalRolesString);
+ setTerminatingProcessingMessage(terminatingMessage, processorContext);
+ }
+
+ return processorContext;
+ }
+}
diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java index f02a4bb..704151d 100644 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java +++ b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFPolicyThresholdsProcessor.java @@ -1,136 +1,136 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import com.google.common.base.Optional; -import com.google.common.collect.Table; -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.model.domain.cef.Domain; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; -import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nonnull; - -/** - *<p> - * TCA CEF Policy Threshold processor - * <br> - * Pre Conditions: Domain and Functional Role must be present in CEF Event Listener Object - *</p> - * - * @author Rajiv Singla . Creation Date: 11/9/2016. - */ -public class TCACEFPolicyThresholdsProcessor extends AbstractTCAECEFPolicyProcessor { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(TCACEFPolicyThresholdsProcessor.class); - - @Override - public TCACEFProcessorContext preProcessor(@Nonnull TCACEFProcessorContext processorContext) { - // validates Domain and Functional Role are present - final EventListener eventListener = processorContext.getCEFEventListener(); - final Domain domain = eventListener.getEvent().getCommonEventHeader().getDomain(); - final String eventName = eventListener.getEvent().getCommonEventHeader().getEventName(); - if (domain == null || eventName == null) { - final String errorMessage = "CEF Event Listener domain or eventName not Present. " + - "Invalid use of this Processor"; - throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - return super.preProcessor(processorContext); - } - - @Override - public String getProcessorDescription() { - return "Applies TCA Policy rules to incoming CEF message. If any thresholds are violated attaches max " + - "Severity violated threshold to TCA Processor Context"; - } - - @Override - public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) { - - final String cefMessage = processorContext.getMessage(); - - // Determine domain and eventName - final EventListener eventListener = processorContext.getCEFEventListener(); - final String eventName = eventListener.getEvent().getCommonEventHeader().getEventName(); - - // Get Table containing event Name and Thresholds Field Path - final TCAPolicy tcaPolicy = processorContext.getTCAPolicy(); - final Table<String, String, List<Threshold>> eventNameFieldPathsTable = - TCAUtils.getPolicyEventNameThresholdsTableSupplier(tcaPolicy).get(); - - // Get Policy Field Paths for that event Name - final Map<String, List<Threshold>> policyFieldPathsMap = eventNameFieldPathsTable.row(eventName); - final Set<String> policyFieldPaths = policyFieldPathsMap.keySet(); - - // Get Json Values for Policy Fields - final Map<String, List<Long>> messageFieldValuesMap = TCAUtils.getJsonPathValue(cefMessage, policyFieldPaths); - - // Determine all violated thresholds per message field Path - final Map<String, Threshold> violatedThresholdsMap = new HashMap<>(); - for (Map.Entry<String, List<Long>> messageFieldValuesMapEntry : messageFieldValuesMap.entrySet()) { - final String messageFieldPath = messageFieldValuesMapEntry.getKey(); - final List<Threshold> messageFieldAssociatedPolicyThresholds = policyFieldPathsMap.get(messageFieldPath); - if (messageFieldAssociatedPolicyThresholds != null) { - final Optional<Threshold> thresholdOptional = TCAUtils.thresholdCalculator( - messageFieldValuesMapEntry.getValue(), messageFieldAssociatedPolicyThresholds); - if (thresholdOptional.isPresent()) { - violatedThresholdsMap.put(messageFieldPath, thresholdOptional.get()); - } - } - } - - // No threshold were violated - if (violatedThresholdsMap.isEmpty()) { - - final String terminationMessage = "No Policy Threshold violated by the VES CEF Message."; - setTerminatingProcessingMessage(terminationMessage, processorContext); - - } else { - - // If there are policy violations then determine max priority violation - final Threshold maxSeverityThresholdViolation = - TCAUtils.prioritizeThresholdViolations(violatedThresholdsMap); - final MetricsPerEventName violatedMetrics = TCAUtils.createViolatedMetrics(tcaPolicy, - maxSeverityThresholdViolation, eventName); - // attach policy violation to processor Context - processorContext.setMetricsPerEventName(violatedMetrics); - - final String finishMessage = String.format("Policy Threshold violation detected for threshold: %s", - maxSeverityThresholdViolation); - setFinishedProcessingMessage(finishMessage, processorContext); - - } - - return processorContext; - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.tca.processor;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Table;
+import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.Domain;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+
+/**
+ *<p>
+ * TCA CEF Policy Threshold processor
+ * <br>
+ * Pre Conditions: Domain and Functional Role must be present in CEF Event Listener Object
+ *</p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/9/2016.
+ */
+public class TCACEFPolicyThresholdsProcessor extends AbstractTCAECEFPolicyProcessor {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCACEFPolicyThresholdsProcessor.class);
+
+ @Override
+ public TCACEFProcessorContext preProcessor(@Nonnull TCACEFProcessorContext processorContext) {
+ // validates Domain and Functional Role are present
+ final EventListener eventListener = processorContext.getCEFEventListener();
+ final Domain domain = eventListener.getEvent().getCommonEventHeader().getDomain();
+ final String eventName = eventListener.getEvent().getCommonEventHeader().getEventName();
+ if (domain == null || eventName == null) {
+ final String errorMessage = "CEF Event Listener domain or eventName not Present. " +
+ "Invalid use of this Processor";
+ throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+ return super.preProcessor(processorContext);
+ }
+
+ @Override
+ public String getProcessorDescription() {
+ return "Applies TCA Policy rules to incoming CEF message. If any thresholds are violated attaches max " +
+ "Severity violated threshold to TCA Processor Context";
+ }
+
+ @Override
+ public TCACEFProcessorContext processMessage(TCACEFProcessorContext processorContext) {
+
+ final String cefMessage = processorContext.getMessage();
+
+ // Determine domain and eventName
+ final EventListener eventListener = processorContext.getCEFEventListener();
+ final String eventName = eventListener.getEvent().getCommonEventHeader().getEventName();
+
+ // Get Table containing event Name and Thresholds Field Path
+ final TCAPolicy tcaPolicy = processorContext.getTCAPolicy();
+ final Table<String, String, List<Threshold>> eventNameFieldPathsTable =
+ TCAUtils.getPolicyEventNameThresholdsTableSupplier(tcaPolicy).get();
+
+ // Get Policy Field Paths for that event Name
+ final Map<String, List<Threshold>> policyFieldPathsMap = eventNameFieldPathsTable.row(eventName);
+ final Set<String> policyFieldPaths = policyFieldPathsMap.keySet();
+
+ // Get Json Values for Policy Fields
+ final Map<String, List<Long>> messageFieldValuesMap = TCAUtils.getJsonPathValue(cefMessage, policyFieldPaths);
+
+ // Determine all violated thresholds per message field Path
+ final Map<String, Threshold> violatedThresholdsMap = new HashMap<>();
+ for (Map.Entry<String, List<Long>> messageFieldValuesMapEntry : messageFieldValuesMap.entrySet()) {
+ final String messageFieldPath = messageFieldValuesMapEntry.getKey();
+ final List<Threshold> messageFieldAssociatedPolicyThresholds = policyFieldPathsMap.get(messageFieldPath);
+ if (messageFieldAssociatedPolicyThresholds != null) {
+ final Optional<Threshold> thresholdOptional = TCAUtils.thresholdCalculator(
+ messageFieldValuesMapEntry.getValue(), messageFieldAssociatedPolicyThresholds);
+ if (thresholdOptional.isPresent()) {
+ violatedThresholdsMap.put(messageFieldPath, thresholdOptional.get());
+ }
+ }
+ }
+
+ // No threshold were violated
+ if (violatedThresholdsMap.isEmpty()) {
+
+ final String terminationMessage = "No Policy Threshold violated by the VES CEF Message.";
+ setTerminatingProcessingMessage(terminationMessage, processorContext);
+
+ } else {
+
+ // If there are policy violations then determine max priority violation
+ final Threshold maxSeverityThresholdViolation =
+ TCAUtils.prioritizeThresholdViolations(violatedThresholdsMap);
+ final MetricsPerEventName violatedMetrics = TCAUtils.createViolatedMetrics(tcaPolicy,
+ maxSeverityThresholdViolation, eventName);
+ // attach policy violation to processor Context
+ processorContext.setMetricsPerEventName(violatedMetrics);
+
+ final String finishMessage = String.format("Policy Threshold violation detected for threshold: %s",
+ maxSeverityThresholdViolation);
+ setFinishedProcessingMessage(finishMessage, processorContext);
+
+ }
+
+ return processorContext;
+ }
+}
diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java index 207d1e1..80dfca0 100644 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java +++ b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/processor/TCACEFProcessorContext.java @@ -1,103 +1,103 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.processor; - -import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractProcessorContext; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; - -/** - * TCA CEF Policy Processor Context - * - * @author Rajiv Singla . Creation Date: 11/7/2016. - */ -public class TCACEFProcessorContext extends AbstractProcessorContext { - - private static final long serialVersionUID = 1L; - - private final TCAPolicy tcaPolicy; - private EventListener eventListener; - private MetricsPerEventName metricsPerEventName; - - public TCACEFProcessorContext(final String message, boolean canProcessingContinue, final TCAPolicy tcaPolicy) { - super(message, canProcessingContinue); - this.tcaPolicy = tcaPolicy; - // present only if cef incoming message can be parsed successfully to Event Listener Object - this.eventListener = null; - // present only if there are any threshold violations are detected - this.metricsPerEventName = null; - } - - // Auxiliary Constructor which default canProcessingContinue Flag to true - public TCACEFProcessorContext(final String message, final TCAPolicy tcaPolicy) { - this(message, true, tcaPolicy); - } - - /** - * Returns {@link TCAPolicy} Object - * - * @return TCA Policy - */ - public TCAPolicy getTCAPolicy() { - return tcaPolicy; - } - - /** - * Returns Common Event Format {@link EventListener} if present else null - * - * @return CEF Event Listener - */ - public EventListener getCEFEventListener() { - return eventListener; - } - - - /** - * Sets new {@link EventListener} - * - * @param eventListener set new value for CEF event listener - */ - public void setCEFEventListener(final EventListener eventListener) { - this.eventListener = eventListener; - } - - - /** - * Returns TCA Policy {@link MetricsPerEventName} which was has violated Threshold for the CEF Message if - * present else null - * - * @return Violated Threshold - */ - public MetricsPerEventName getMetricsPerEventName() { - return metricsPerEventName; - } - - /** - * Assign new TCA Policy {@link MetricsPerEventName} which was has violated Threshold for the CEF Message - * - * @param metricsPerEventName new value for Metrics Per Functional Role with violated threshold - */ - public void setMetricsPerEventName(MetricsPerEventName metricsPerEventName) { - this.metricsPerEventName = metricsPerEventName; - } - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.tca.processor;
+
+import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractProcessorContext;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
+
+/**
+ * TCA CEF Policy Processor Context
+ *
+ * @author Rajiv Singla . Creation Date: 11/7/2016.
+ */
+public class TCACEFProcessorContext extends AbstractProcessorContext {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TCAPolicy tcaPolicy;
+ private EventListener eventListener;
+ private MetricsPerEventName metricsPerEventName;
+
+ public TCACEFProcessorContext(final String message, boolean canProcessingContinue, final TCAPolicy tcaPolicy) {
+ super(message, canProcessingContinue);
+ this.tcaPolicy = tcaPolicy;
+ // present only if cef incoming message can be parsed successfully to Event Listener Object
+ this.eventListener = null;
+ // present only if there are any threshold violations are detected
+ this.metricsPerEventName = null;
+ }
+
+ // Auxiliary Constructor which default canProcessingContinue Flag to true
+ public TCACEFProcessorContext(final String message, final TCAPolicy tcaPolicy) {
+ this(message, true, tcaPolicy);
+ }
+
+ /**
+ * Returns {@link TCAPolicy} Object
+ *
+ * @return TCA Policy
+ */
+ public TCAPolicy getTCAPolicy() {
+ return tcaPolicy;
+ }
+
+ /**
+ * Returns Common Event Format {@link EventListener} if present else null
+ *
+ * @return CEF Event Listener
+ */
+ public EventListener getCEFEventListener() {
+ return eventListener;
+ }
+
+
+ /**
+ * Sets new {@link EventListener}
+ *
+ * @param eventListener set new value for CEF event listener
+ */
+ public void setCEFEventListener(final EventListener eventListener) {
+ this.eventListener = eventListener;
+ }
+
+
+ /**
+ * Returns TCA Policy {@link MetricsPerEventName} which was has violated Threshold for the CEF Message if
+ * present else null
+ *
+ * @return Violated Threshold
+ */
+ public MetricsPerEventName getMetricsPerEventName() {
+ return metricsPerEventName;
+ }
+
+ /**
+ * Assign new TCA Policy {@link MetricsPerEventName} which was has violated Threshold for the CEF Message
+ *
+ * @param metricsPerEventName new value for Metrics Per Functional Role with violated threshold
+ */
+ public void setMetricsPerEventName(MetricsPerEventName metricsPerEventName) {
+ this.metricsPerEventName = metricsPerEventName;
+ }
+
+}
diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtils.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtils.java index cd61e65..dd37aa2 100644 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtils.java +++ b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/utils/TCAUtils.java @@ -1,784 +1,1002 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.tca.utils; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.collect.HashBasedTable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Table; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.JsonPath; -import com.jayway.jsonpath.TypeRef; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; -import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor; -import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor; -import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction; -import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType; -import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader; -import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity; -import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter; -import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; -import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI; -import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse; -import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils; -import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; -import org.quartz.Job; -import org.quartz.JobBuilder; -import org.quartz.JobDataMap; -import org.quartz.JobDetail; -import org.quartz.Scheduler; -import org.quartz.SchedulerException; -import org.quartz.SimpleScheduleBuilder; -import org.quartz.SimpleTrigger; -import org.quartz.TriggerBuilder; -import org.quartz.impl.StdSchedulerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.UUID; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import static com.google.common.collect.Lists.newArrayList; -import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT; - -/** - * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get - * pre configured Json Object Mapper understand serialization and deserialization of CEF Message - * and TCA Policy - * - * @author Rajiv Singla . Creation Date: 10/24/2016. - */ -public abstract class TCAUtils extends AnalyticsModelJsonUtils { - - private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class); - - /** - * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR, - * WARNING ) - */ - private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() { - @Override - public int compare(Threshold threshold1, Threshold threshold2) { - return threshold1.getSeverity().compareTo(threshold2.getSeverity()); - } - }; - - /** - * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy} - * - * @return TCA Policy Metrics Per Event Name list - */ - public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() { - return new Function<TCAPolicy, List<MetricsPerEventName>>() { - @Nullable - @Override - public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) { - return tcaPolicy.getMetricsPerEventName(); - } - }; - } - - /** - * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from - * {@link MetricsPerEventName} - * - * @return Event Names or a Metrics Per Event Name object - */ - public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() { - return new Function<MetricsPerEventName, String>() { - @Override - public String apply(@Nonnull MetricsPerEventName metricsPerEventName) { - return metricsPerEventName.getEventName(); - } - }; - } - - - /** - * Extracts {@link TCAPolicy} Event Names - * - * @param tcaPolicy TCA Policy - * @return List of event names in the TCA Policy - */ - public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) { - final List<MetricsPerEventName> metricsPerEventNames = - tcaPolicyMetricsExtractorFunction().apply(tcaPolicy); - - return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction()); - } - - /** - * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to - * change during runtime - * - * @param tcaPolicy TCA Policy - * @return a Supplier that memoize the TCA Policy event names - */ - public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) { - return Suppliers.memoize(new Supplier<List<String>>() { - @Override - public List<String> get() { - return getPolicyEventNames(tcaPolicy); - } - }); - } - - - /** - * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path - * - * @param tcaPolicy TCA Policy - * @return A table with Keys of event name and field path containing List of threshold as values - */ - public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) { - final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create(); - for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) { - final String eventName = metricsPerEventName.getEventName(); - final List<Threshold> thresholds = metricsPerEventName.getThresholds(); - for (Threshold threshold : thresholds) { - final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath()); - if (existingThresholds == null) { - final LinkedList<Threshold> newThresholdList = new LinkedList<>(); - newThresholdList.add(threshold); - domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList); - } else { - domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold); - } - } - } - return domainFRTable; - } - - - /** - * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table - * - * @param tcaPolicy TCA Policy - * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values - */ - public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier - (final TCAPolicy tcaPolicy) { - return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() { - @Override - public Table<String, String, List<Threshold>> get() { - return getPolicyEventNameThresholdsTable(tcaPolicy); - } - }); - } - - - /** - * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor}, - * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to - * filter out messages which does not match policy domain or event Name - * - * @param cefMessage CEF Message - * @param tcaPolicy TCA Policy - * @return Message Process Context after processing filter chain - */ - public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage, - @Nonnull final TCAPolicy tcaPolicy) { - - final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor(); - final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter(); - final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter(); - // Create a list of message processors - final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors = - ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter); - final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy); - // Create a message processors chain - final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain = - new GenericMessageChainProcessor<>(messageProcessors, processorContext); - // process chain - return tcaProcessingChain.processChain(); - } - - - /** - * Extracts json path values for given json Field Paths from using Json path notation. Assumes - * that values extracted are always long - * - * @param message CEF Message - * @param jsonFieldPaths Json Field Paths - * @return Map containing key as json path and values as values associated with that json path - */ - public static Map<String, List<Long>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String> - jsonFieldPaths) { - - final Map<String, List<Long>> jsonFieldPathMap = new HashMap<>(); - final DocumentContext documentContext = JsonPath.parse(message); - - for (String jsonFieldPath : jsonFieldPaths) { - final List<Long> jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<Long>>() { - }); - // If Json Field Values are not or empty - if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) { - // Filter out all null values in the filed values list - final List<Long> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues, - Predicates.<Long>notNull())); - // If there are non null values put them in the map - if (!nonNullValues.isEmpty()) { - jsonFieldPathMap.put(jsonFieldPath, nonNullValues); - } - } - } - - return jsonFieldPathMap; - } - - /** - * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path - * it applies threshold in order of their severity and record the first threshold per message field path - * - * @param messageFieldValues Field Path Values extracted from CEF Message - * @param fieldThresholds Policy Thresholds for Field Path - * @return Optional of violated threshold for a field path - */ - public static Optional<Threshold> thresholdCalculator(final List<Long> messageFieldValues, final List<Threshold> - fieldThresholds) { - // order thresholds by severity - Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR); - // Now apply each threshold to field values - for (Threshold fieldThreshold : fieldThresholds) { - for (Long messageFieldValue : messageFieldValues) { - final Boolean isThresholdViolated = - fieldThreshold.getDirection().operate(messageFieldValue, fieldThreshold.getThresholdValue()); - if (isThresholdViolated) { - final Threshold violatedThreshold = Threshold.copy(fieldThreshold); - violatedThreshold.setActualFieldValue(messageFieldValue); - return Optional.of(violatedThreshold); - } - } - } - return Optional.absent(); - } - - /** - * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message. - * Grabs first highest priority violated threshold - * - * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds - * @return First Highest priority violated threshold - */ - public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) { - - final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values()); - - if (violatedThresholds.size() == 1) { - return violatedThresholds.get(0); - } - Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR); - // Just grab the first violated threshold with highest priority - return violatedThresholds.get(0); - } - - - /** - * Creates {@link MetricsPerEventName} object which contains violated thresholds - * - * @param tcaPolicy TCA Policy - * @param violatedThreshold Violated thresholds - * @param eventName Event Name - * - * @return MetricsPerEventName object containing one highest severity violated threshold - */ - public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy, - @Nonnull final Threshold violatedThreshold, - @Nonnull final String eventName) { - - final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList( - Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() { - @Override - public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) { - return metricsPerEventName.getEventName().equals(eventName); - } - })); - // TCA policy must have only one metrics per event Name - if (metricsPerEventNames.size() == 1) { - final MetricsPerEventName violatedMetrics = - MetricsPerEventName.copy(metricsPerEventNames.get(0)); - violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold))); - return violatedMetrics; - } else { - final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName); - throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage)); - } - } - - /** - * Computes threshold violations - * - * @param processorContext Filtered processor Context - * @return processor context with any threshold violations - */ - public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) { - final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor(); - return policyThresholdsProcessor.apply(processorContext); - } - - - /** - * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse} - * formats - * - * @param processorContextWithViolations processor context which has TCA violations - * @param tcaAppName tca app name - * @param isAlertInCEFFormat determines if output alert is in CEF format - * - * @return TCA Alert String - * - * @throws JsonProcessingException If alert cannot be parsed into JSON String - */ - public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations, - final String tcaAppName, - final Boolean isAlertInCEFFormat) throws JsonProcessingException { - if (isAlertInCEFFormat != null && isAlertInCEFFormat) { - final EventListener eventListenerWithViolations = - addThresholdViolationFields(processorContextWithViolations); - final String alertString = writeValueAsString(eventListenerWithViolations); - LOG.debug("Created alert in CEF Format: {}", alertString); - return alertString; - } else { - final TCAVESResponse newTCAVESResponse = - createNewTCAVESResponse(processorContextWithViolations, tcaAppName); - final String alertString = writeValueAsString(newTCAVESResponse); - LOG.debug("Created alert in Non CEF Format: {}", alertString); - return alertString; - } - } - - /** - * Adds threshold violation fields to {@link EventListener} - * - * @param processorContextWithViolations processor context that contains violations - * @return event listener with threshold crossing alert fields populated - */ - public static EventListener addThresholdViolationFields( - final TCACEFProcessorContext processorContextWithViolations) { - - final MetricsPerEventName metricsPerEventName = - processorContextWithViolations.getMetricsPerEventName(); - // confirm violations are indeed present - if (metricsPerEventName == null) { - final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields"; - throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - - // get violated threshold - final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0); - final EventListener eventListener = processorContextWithViolations.getCEFEventListener(); - final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader(); - - // create new threshold crossing alert fields - final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields(); - thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString()); - thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity()); - thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date())); - thresholdCrossingAlertFields.setAlertAction(AlertAction.SET); - thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY); - thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString()); - thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName()); - thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName()); - - // create new performance count - final PerformanceCounter performanceCounter = new PerformanceCounter(); - performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity())); - performanceCounter.setName(violatedThreshold.getFieldPath()); - performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString()); - performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString()); - - // set additional parameters for threshold crossing alert fields - thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter)); - - // add threshold crossing fields to existing event listener - eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields); - - return eventListener; - } - - /** - * Converts {@link EventSeverity} to {@link Criticality} - * - * @param eventSeverity event severity - * - * @return performance counter criticality - */ - private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) { - switch (eventSeverity) { - case CRITICAL: - return Criticality.CRIT; - case MAJOR: - return Criticality.MAJ; - default: - return Criticality.UNKNOWN; - } - } - - /** - * Creates {@link TCAVESResponse} object - * - * @param processorContext processor Context with violations - * @param tcaAppName TCA App Name - * - * @return TCA VES Response Message - */ - public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext, - final String tcaAppName) { - - final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName(); - // confirm violations are indeed present - if (metricsPerEventName == null) { - final String errorMessage = "No violations metrics. Unable to create VES Response"; - throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } - - final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0); - final EventListener eventListener = processorContext.getCEFEventListener(); - final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader(); - - final TCAVESResponse tcavesResponse = new TCAVESResponse(); - // ClosedLoopControlName included in the DCAE configuration Policy - tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName()); - // version included in the DCAE configuration Policy - tcavesResponse.setVersion(violatedThreshold.getVersion()); - // Generate a UUID for this output message - tcavesResponse.setRequestID(UUID.randomUUID().toString()); - // commonEventHeader.startEpochMicrosec from the received VES measurementsForVfScaling message - tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec()); - // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot - // TODO: Find out how to get this field - tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName); - - final AAI aai = new AAI(); - tcavesResponse.setAai(aai); - - // VM specific settings - if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) { - // Hard Coded - "VM" - tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE); - // Hard Coded - "vserver.vserver-name" - tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET); - aai.setGenericServerId(commonEventHeader.getReportingEntityName()); - } else { - // VNF specific settings - // Hard Coded - "VNF" - tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE); - // Hard Coded - "generic-vnf.vnf-id" - tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET); - // commonEventHeader.reportingEntityName from the received VES measurementsForVfScaling message (value for - // the data element used in A&AI) - aai.setGenericVNFId(commonEventHeader.getReportingEntityName()); - } - - // Hard Coded - "DCAE" - tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM); - // policyScope included in the DCAE configuration Policy - tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope()); - // policyName included in the DCAE configuration Policy - tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName()); - // policyVersion included in the DCAE configuration Policy - tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion()); - // Extracted from violated threshold - tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name()); - - return tcavesResponse; - } - - - /** - * Extract Domain and Event Name from processor context if present - * - * @param processorContext processor context - * @return Tuple of domain and event Name - */ - public static Pair<String, String> getDomainAndEventName( - @Nullable final TCACEFProcessorContext processorContext) { - - String domain = null; - String eventName = null; - - if (processorContext != null && - processorContext.getCEFEventListener() != null && - processorContext.getCEFEventListener().getEvent() != null && - processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) { - final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent() - .getCommonEventHeader(); - - if (commonEventHeader.getDomain() != null) { - domain = commonEventHeader.getDomain().name(); - } - - if (commonEventHeader.getEventName() != null) { - eventName = commonEventHeader.getEventName(); - } - - } - - return new ImmutablePair<>(domain, eventName); - - } - - /** - * Creates {@link TCAPolicy} Metrics per Event Name list - * - * @param eventNamesMap Map containing event Name as key and corresponding values - * - * @return List of {@link MetricsPerEventName} - */ - public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList( - final Map<String, Map<String, String>> eventNamesMap) { - - // create a new metrics per event Name list - final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>(); - - for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) { - - // create new metrics per event Name instance - final MetricsPerEventName newMetricsPerEventName = - createNewMetricsPerEventName(eventNamesEntry); - metricsPerEventNames.add(newMetricsPerEventName); - - // determine all threshold related values - final Map<String, String> thresholdsValuesMaps = - filterMapByKeyNamePrefix(eventNamesEntry.getValue(), - AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX); - - // create a map of all threshold values - final Map<String, Map<String, String>> thresholdsMap = - extractSubTree(thresholdsValuesMaps, 1, 2, - AnalyticsConstants.TCA_POLICY_DELIMITER); - - // add thresholds to nmetrics per event Names threshold list - for (Map<String, String> thresholdMap : thresholdsMap.values()) { - newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap)); - } - - } - - return metricsPerEventNames; - } - - /** - * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap - * - * @param thresholdMap threshold map with threshold values - * - * @return new instance of TCA Policy Threshold - */ - public static Threshold createNewThreshold(final Map<String, String> thresholdMap) { - final Threshold threshold = new Threshold(); - threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName")); - threshold.setVersion(thresholdMap.get("policy.version")); - threshold.setFieldPath(thresholdMap.get("policy.fieldPath")); - threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction"))); - threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity"))); - threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue"))); - threshold.setClosedLoopEventStatus( - ControlLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus"))); - return threshold; - } - - /** - * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope - * extracted from given eventNamesEntry - * - * @param eventNamesEntry Event Names Entry - * - * @return new instance of MetricsPerEventName - */ - public static MetricsPerEventName createNewMetricsPerEventName( - final Map.Entry<String, Map<String, String>> eventNamesEntry) { - // determine event Name - final String eventName = eventNamesEntry.getKey(); - // determine event Name thresholds - final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue(); - final MetricsPerEventName metricsPerEventName = new MetricsPerEventName(); - final List<Threshold> thresholds = new LinkedList<>(); - metricsPerEventName.setThresholds(thresholds); - metricsPerEventName.setEventName(eventName); - // bind policyName, policyVersion, policyScope and closedLoopControlName - metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName")); - metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion")); - metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope")); - metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf( - metricsPerEventNameThresholdsMap.get("closedLoopControlName"))); - return metricsPerEventName; - } - - /** - * Converts a flattened key/value map which has keys delimited by a given delimiter. - * The start Index and end index extract the sub-key value and returns a new map containing - * sub-keys and values. - * - * @param actualMap actual Map - * @param startIndex start index - * @param endIndex end index - * @param delimiter delimiter - * - * @return Map with new sub tree map - */ - public static Map<String, Map<String, String>> extractSubTree( - final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) { - - final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>(); - - // iterate over actual map entries - for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) { - final String actualMapKey = actualMapEntry.getKey(); - final String actualMapValue = actualMapEntry.getValue(); - - // determine delimiter start and end index - final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex); - final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex); - final int keyLength = actualMapKey.length(); - - // extract sub-tree map - if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) { - final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex); - final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey); - final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength); - if (existingThresholdMap == null) { - Map<String, String> newThresholdMap = new LinkedHashMap<>(); - newThresholdMap.put(subMapKey, actualMapValue); - subTreeMap.put(thresholdKey, newThresholdMap); - } else { - existingThresholdMap.put(subMapKey, actualMapValue); - } - - } - } - - return subTreeMap; - - } - - - /** - * Provides a view of underlying map that filters out entries with keys starting with give prefix - * - * @param actualMap Target map that needs to be filtered - * @param keyNamePrefix key prefix - * - * @return a view of actual map which only show entries which have give prefix - */ - public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap, - final String keyNamePrefix) { - return Maps.filterKeys(actualMap, - new Predicate<String>() { - @Override - public boolean apply(@Nullable String key) { - return key != null && key.startsWith(keyNamePrefix); - } - }); - } - - - /** - * Creates Quartz Scheduler - * - * @param pollingIntervalMS polling interval - * @param stdSchedulerFactory Quartz standard schedule factory instance - * @param quartzPublisherPropertiesFileName quartz properties file name - * @param jobDataMap job Data map - * @param quartzJobClass Quartz Job Class - * @param quartzJobName Quartz Job Name - * @param quartzTriggerName Quartz Trigger name - * - * @param <T> An implementation of Quartz {@link Job} interface - * @return Configured Quartz Scheduler - * - * @throws SchedulerException exception if unable to create to Quartz Scheduler - */ - public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS, - final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName, - final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName, - final String quartzTriggerName) throws SchedulerException { - - // Initialize a new Quartz Standard scheduler - LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}", - quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName); - final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile( - quartzPublisherPropertiesFileName, new Properties()); - stdSchedulerFactory.initialize(quartzProperties); - final Scheduler scheduler = stdSchedulerFactory.getScheduler(); - - // Create a new job detail - final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName, - AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build(); - - // Create a new scheduling builder - final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule() - .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule - .repeatForever(); // repeats while worker is running - - // Create a trigger for the TCA Publisher Job - final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger() - .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME) - .startNow() // job starts right away - .withSchedule(simpleScheduleBuilder).build(); - - scheduler.scheduleJob(jobDetail, simpleTrigger); - LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName()); - return scheduler; - } - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.tca.utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.TypeRef;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.openecomp.dcae.apod.analytics.aai.service.AAIEnrichmentClient;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
+import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
+import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter;
+import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
+import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI;
+import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
+import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;
+import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.SimpleScheduleBuilder;
+import org.quartz.SimpleTrigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;
+
+/**
+ * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get
+ * pre configured Json Object Mapper understand serialization and deserialization of CEF Message
+ * and TCA Policy
+ *
+ * @author Rajiv Singla . Creation Date: 10/24/2016.
+ */
+public abstract class TCAUtils extends AnalyticsModelJsonUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);
+
+ /**
+ * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,
+ * WARNING )
+ */
+ private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {
+ @Override
+ public int compare(Threshold threshold1, Threshold threshold2) {
+ return threshold1.getSeverity().compareTo(threshold2.getSeverity());
+ }
+ };
+
+ /**
+ * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}
+ *
+ * @return TCA Policy Metrics Per Event Name list
+ */
+ public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {
+ return new Function<TCAPolicy, List<MetricsPerEventName>>() {
+ @Nullable
+ @Override
+ public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {
+ return tcaPolicy.getMetricsPerEventName();
+ }
+ };
+ }
+
+ /**
+ * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from
+ * {@link MetricsPerEventName}
+ *
+ * @return Event Names or a Metrics Per Event Name object
+ */
+ public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {
+ return new Function<MetricsPerEventName, String>() {
+ @Override
+ public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {
+ return metricsPerEventName.getEventName();
+ }
+ };
+ }
+
+
+ /**
+ * Extracts {@link TCAPolicy} Event Names
+ *
+ * @param tcaPolicy TCA Policy
+ * @return List of event names in the TCA Policy
+ */
+ public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {
+ final List<MetricsPerEventName> metricsPerEventNames =
+ tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);
+
+ return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());
+ }
+
+ /**
+ * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to
+ * change during runtime
+ *
+ * @param tcaPolicy TCA Policy
+ * @return a Supplier that memoize the TCA Policy event names
+ */
+ public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {
+ return Suppliers.memoize(new Supplier<List<String>>() {
+ @Override
+ public List<String> get() {
+ return getPolicyEventNames(tcaPolicy);
+ }
+ });
+ }
+
+
+ /**
+ * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path
+ *
+ * @param tcaPolicy TCA Policy
+ * @return A table with Keys of event name and field path containing List of threshold as values
+ */
+ public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {
+ final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();
+ for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {
+ final String eventName = metricsPerEventName.getEventName();
+ final List<Threshold> thresholds = metricsPerEventName.getThresholds();
+ for (Threshold threshold : thresholds) {
+ final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());
+ if (existingThresholds == null) {
+ final LinkedList<Threshold> newThresholdList = new LinkedList<>();
+ newThresholdList.add(threshold);
+ domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);
+ } else {
+ domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);
+ }
+ }
+ }
+ return domainFRTable;
+ }
+
+
+ /**
+ * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table
+ *
+ * @param tcaPolicy TCA Policy
+ * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values
+ */
+ public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier
+ (final TCAPolicy tcaPolicy) {
+ return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {
+ @Override
+ public Table<String, String, List<Threshold>> get() {
+ return getPolicyEventNameThresholdsTable(tcaPolicy);
+ }
+ });
+ }
+
+
+ /**
+ * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},
+ * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to
+ * filter out messages which does not match policy domain or event Name
+ *
+ * @param cefMessage CEF Message
+ * @param tcaPolicy TCA Policy
+ * @return Message Process Context after processing filter chain
+ */
+ public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,
+ @Nonnull final TCAPolicy tcaPolicy) {
+
+ final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
+ final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();
+ final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();
+ // Create a list of message processors
+ final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =
+ ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);
+ final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);
+ // Create a message processors chain
+ final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =
+ new GenericMessageChainProcessor<>(messageProcessors, processorContext);
+ // process chain
+ return tcaProcessingChain.processChain();
+ }
+
+
+ /**
+ * Extracts json path values for given json Field Paths from using Json path notation. Assumes
+ * that values extracted are always long
+ *
+ * @param message CEF Message
+ * @param jsonFieldPaths Json Field Paths
+ * @return Map containing key as json path and values as values associated with that json path
+ */
+ public static Map<String, List<Long>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>
+ jsonFieldPaths) {
+
+ final Map<String, List<Long>> jsonFieldPathMap = new HashMap<>();
+ final DocumentContext documentContext = JsonPath.parse(message);
+
+ for (String jsonFieldPath : jsonFieldPaths) {
+ final List<Long> jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<Long>>() {
+ });
+ // If Json Field Values are not or empty
+ if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
+ // Filter out all null values in the filed values list
+ final List<Long> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,
+ Predicates.<Long>notNull()));
+ // If there are non null values put them in the map
+ if (!nonNullValues.isEmpty()) {
+ jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
+ }
+ }
+ }
+
+ return jsonFieldPathMap;
+ }
+
+ /**
+ * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
+ * it applies threshold in order of their severity and record the first threshold per message field path
+ *
+ * @param messageFieldValues Field Path Values extracted from CEF Message
+ * @param fieldThresholds Policy Thresholds for Field Path
+ * @return Optional of violated threshold for a field path
+ */
+ public static Optional<Threshold> thresholdCalculator(final List<Long> messageFieldValues, final List<Threshold>
+ fieldThresholds) {
+ // order thresholds by severity
+ Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);
+ // Now apply each threshold to field values
+ for (Threshold fieldThreshold : fieldThresholds) {
+ for (Long messageFieldValue : messageFieldValues) {
+ final Boolean isThresholdViolated =
+ fieldThreshold.getDirection().operate(messageFieldValue, fieldThreshold.getThresholdValue());
+ if (isThresholdViolated) {
+ final Threshold violatedThreshold = Threshold.copy(fieldThreshold);
+ violatedThreshold.setActualFieldValue(messageFieldValue);
+ return Optional.of(violatedThreshold);
+ }
+ }
+ }
+ return Optional.absent();
+ }
+
+ /**
+ * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.
+ * Grabs first highest priority violated threshold
+ *
+ * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds
+ * @return First Highest priority violated threshold
+ */
+ public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {
+
+ final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());
+
+ if (violatedThresholds.size() == 1) {
+ return violatedThresholds.get(0);
+ }
+ Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);
+ // Just grab the first violated threshold with highest priority
+ return violatedThresholds.get(0);
+ }
+
+
+ /**
+ * Creates {@link MetricsPerEventName} object which contains violated thresholds
+ *
+ * @param tcaPolicy TCA Policy
+ * @param violatedThreshold Violated thresholds
+ * @param eventName Event Name
+ *
+ * @return MetricsPerEventName object containing one highest severity violated threshold
+ */
+ public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,
+ @Nonnull final Threshold violatedThreshold,
+ @Nonnull final String eventName) {
+
+ final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(
+ Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {
+ @Override
+ public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {
+ return metricsPerEventName.getEventName().equals(eventName);
+ }
+ }));
+ // TCA policy must have only one metrics per event Name
+ if (metricsPerEventNames.size() == 1) {
+ final MetricsPerEventName violatedMetrics =
+ MetricsPerEventName.copy(metricsPerEventNames.get(0));
+ violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));
+ return violatedMetrics;
+ } else {
+ final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);
+ throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
+ }
+ }
+
+ /**
+ * Computes threshold violations
+ *
+ * @param processorContext Filtered processor Context
+ * @return processor context with any threshold violations
+ */
+ public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {
+ final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();
+ return policyThresholdsProcessor.apply(processorContext);
+ }
+
+
+ /**
+ * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}
+ * formats
+ *
+ * @param processorContextWithViolations processor context which has TCA violations
+ * @param tcaAppName tca app name
+ * @param isAlertInCEFFormat determines if output alert is in CEF format
+ *
+ * @return TCA Alert String
+ *
+ * @throws JsonProcessingException If alert cannot be parsed into JSON String
+ */
+ public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,
+ final String tcaAppName,
+ final Boolean isAlertInCEFFormat) throws JsonProcessingException {
+ if (isAlertInCEFFormat != null && isAlertInCEFFormat) {
+ final EventListener eventListenerWithViolations =
+ addThresholdViolationFields(processorContextWithViolations);
+ final String alertString = writeValueAsString(eventListenerWithViolations);
+ LOG.debug("Created alert in CEF Format: {}", alertString);
+ return alertString;
+ } else {
+ final TCAVESResponse newTCAVESResponse =
+ createNewTCAVESResponse(processorContextWithViolations, tcaAppName);
+ final String alertString = writeValueAsString(newTCAVESResponse);
+ LOG.debug("Created alert in Non CEF Format: {}", alertString);
+ return alertString;
+ }
+ }
+
+ /**
+ * Adds threshold violation fields to {@link EventListener}
+ *
+ * @param processorContextWithViolations processor context that contains violations
+ * @return event listener with threshold crossing alert fields populated
+ */
+ public static EventListener addThresholdViolationFields(
+ final TCACEFProcessorContext processorContextWithViolations) {
+
+ final MetricsPerEventName metricsPerEventName =
+ processorContextWithViolations.getMetricsPerEventName();
+ // confirm violations are indeed present
+ if (metricsPerEventName == null) {
+ final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";
+ throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+
+ // get violated threshold
+ final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
+ final EventListener eventListener = processorContextWithViolations.getCEFEventListener();
+ final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
+
+ // create new threshold crossing alert fields
+ final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();
+ thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());
+ thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());
+ thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));
+ thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);
+ thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);
+ thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());
+ thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());
+ thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());
+
+ // create new performance count
+ final PerformanceCounter performanceCounter = new PerformanceCounter();
+ performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));
+ performanceCounter.setName(violatedThreshold.getFieldPath());
+ performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());
+ performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());
+
+ // set additional parameters for threshold crossing alert fields
+ thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));
+
+ // add threshold crossing fields to existing event listener
+ eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);
+
+ return eventListener;
+ }
+
+ /**
+ * Converts {@link EventSeverity} to {@link Criticality}
+ *
+ * @param eventSeverity event severity
+ *
+ * @return performance counter criticality
+ */
+ private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {
+ switch (eventSeverity) {
+ case CRITICAL:
+ return Criticality.CRIT;
+ case MAJOR:
+ return Criticality.MAJ;
+ default:
+ return Criticality.UNKNOWN;
+ }
+ }
+
+ /**
+ * Creates {@link TCAVESResponse} object
+ *
+ * @param processorContext processor Context with violations
+ * @param tcaAppName TCA App Name
+ *
+ * @return TCA VES Response Message
+ */
+ public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,
+ final String tcaAppName) {
+
+ final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
+ // confirm violations are indeed present
+ if (metricsPerEventName == null) {
+ final String errorMessage = "No violations metrics. Unable to create VES Response";
+ throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
+ }
+
+ final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
+ final EventListener eventListener = processorContext.getCEFEventListener();
+ final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
+
+ final TCAVESResponse tcavesResponse = new TCAVESResponse();
+ // ClosedLoopControlName included in the DCAE configuration Policy
+ tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());
+ // version included in the DCAE configuration Policy
+ tcavesResponse.setVersion(violatedThreshold.getVersion());
+ // Generate a UUID for this output message
+ tcavesResponse.setRequestID(UUID.randomUUID().toString());
+ // commonEventHeader.startEpochMicrosec from the received VES message
+ tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());
+ // commonEventHeader.lastEpochMicrosec from the received VES message for abated alerts
+ if (violatedThreshold.getClosedLoopEventStatus() == ClosedLoopEventStatus.ABATED) {
+ tcavesResponse.setClosedLoopAlarmEnd(commonEventHeader.getLastEpochMicrosec());
+ }
+ // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot
+ tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);
+
+ final AAI aai = new AAI();
+ tcavesResponse.setAai(aai);
+
+ // VM specific settings
+ if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {
+ // Hard Coded - "VM"
+ tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);
+ // Hard Coded - "vserver.vserver-name"
+ tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);
+ // commonEventHeader.sourceName from the received VES message
+ aai.setGenericServerId(commonEventHeader.getSourceName());
+ } else {
+ // VNF specific settings
+ // Hard Coded - "VNF"
+ tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);
+ // Hard Coded - "generic-vnf.vnf-id"
+ tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);
+ // commonEventHeader.sourceName from the received VES message
+ aai.setGenericVNFId(commonEventHeader.getSourceName());
+ }
+
+ // Hard Coded - "DCAE"
+ tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);
+ // policyScope included in the DCAE configuration Policy
+ tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());
+ // policyName included in the DCAE configuration Policy
+ tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());
+ // policyVersion included in the DCAE configuration Policy
+ tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());
+ // Extracted from violated threshold
+ tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());
+
+ return tcavesResponse;
+ }
+
+
+ /**
+ * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert
+ *
+ * @param tcavesResponse alert
+ *
+ * @return control Loop Schema Type
+ */
+ public static ControlLoopSchemaType determineControlLoopSchemaType(final TCAVESResponse tcavesResponse) {
+ final AAI aai = tcavesResponse.getAai();
+ if (aai.getGenericServerId() != null) {
+ return ControlLoopSchemaType.VM;
+ } else {
+ return ControlLoopSchemaType.VNF;
+ }
+ }
+
+ /**
+ * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert
+ *
+ * @param tcavesResponse {@link TCAVESResponse} TCA alert
+ *
+ * @return Source name
+ */
+ public static String determineSourceName(final TCAVESResponse tcavesResponse) {
+ final AAI aai = tcavesResponse.getAai();
+ if (aai.getGenericServerId() != null) {
+ return aai.getGenericServerId();
+ } else {
+ return aai.getGenericVNFId();
+ }
+ }
+
+
+ /**
+ * Extract Domain and Event Name from processor context if present
+ *
+ * @param processorContext processor context
+ * @return Tuple of domain and event Name
+ */
+ public static Pair<String, String> getDomainAndEventName(
+ @Nullable final TCACEFProcessorContext processorContext) {
+
+ String domain = null;
+ String eventName = null;
+
+ if (processorContext != null &&
+ processorContext.getCEFEventListener() != null &&
+ processorContext.getCEFEventListener().getEvent() != null &&
+ processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {
+ final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()
+ .getCommonEventHeader();
+
+ if (commonEventHeader.getDomain() != null) {
+ domain = commonEventHeader.getDomain().name();
+ }
+
+ if (commonEventHeader.getEventName() != null) {
+ eventName = commonEventHeader.getEventName();
+ }
+
+ }
+
+ return new ImmutablePair<>(domain, eventName);
+
+ }
+
+ /**
+ * Creates {@link TCAPolicy} Metrics per Event Name list
+ *
+ * @param eventNamesMap Map containing event Name as key and corresponding values
+ *
+ * @return List of {@link MetricsPerEventName}
+ */
+ public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(
+ final Map<String, Map<String, String>> eventNamesMap) {
+
+ // create a new metrics per event Name list
+ final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();
+
+ for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {
+
+ // create new metrics per event Name instance
+ final MetricsPerEventName newMetricsPerEventName =
+ createNewMetricsPerEventName(eventNamesEntry);
+ metricsPerEventNames.add(newMetricsPerEventName);
+
+ // determine all threshold related values
+ final Map<String, String> thresholdsValuesMaps =
+ filterMapByKeyNamePrefix(eventNamesEntry.getValue(),
+ AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);
+
+ // create a map of all threshold values
+ final Map<String, Map<String, String>> thresholdsMap =
+ extractSubTree(thresholdsValuesMaps, 1, 2,
+ AnalyticsConstants.TCA_POLICY_DELIMITER);
+
+ // add thresholds to nmetrics per event Names threshold list
+ for (Map<String, String> thresholdMap : thresholdsMap.values()) {
+ newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));
+ }
+
+ }
+
+ return metricsPerEventNames;
+ }
+
+ /**
+ * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap
+ *
+ * @param thresholdMap threshold map with threshold values
+ *
+ * @return new instance of TCA Policy Threshold
+ */
+ public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {
+ final Threshold threshold = new Threshold();
+ threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));
+ threshold.setVersion(thresholdMap.get("policy.version"));
+ threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));
+ threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));
+ threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));
+ threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));
+ threshold.setClosedLoopEventStatus(
+ ClosedLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));
+ return threshold;
+ }
+
+ /**
+ * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope
+ * extracted from given eventNamesEntry
+ *
+ * @param eventNamesEntry Event Names Entry
+ *
+ * @return new instance of MetricsPerEventName
+ */
+ public static MetricsPerEventName createNewMetricsPerEventName(
+ final Map.Entry<String, Map<String, String>> eventNamesEntry) {
+ // determine event Name
+ final String eventName = eventNamesEntry.getKey();
+ // determine event Name thresholds
+ final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();
+ final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();
+ final List<Threshold> thresholds = new LinkedList<>();
+ metricsPerEventName.setThresholds(thresholds);
+ metricsPerEventName.setEventName(eventName);
+ // bind policyName, policyVersion, policyScope and closedLoopControlName
+ metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));
+ metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));
+ metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));
+ metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(
+ metricsPerEventNameThresholdsMap.get("controlLoopSchemaType")));
+ return metricsPerEventName;
+ }
+
+ /**
+ * Converts a flattened key/value map which has keys delimited by a given delimiter.
+ * The start Index and end index extract the sub-key value and returns a new map containing
+ * sub-keys and values.
+ *
+ * @param actualMap actual Map
+ * @param startIndex start index
+ * @param endIndex end index
+ * @param delimiter delimiter
+ *
+ * @return Map with new sub tree map
+ */
+ public static Map<String, Map<String, String>> extractSubTree(
+ final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {
+
+ final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();
+
+ // iterate over actual map entries
+ for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {
+ final String actualMapKey = actualMapEntry.getKey();
+ final String actualMapValue = actualMapEntry.getValue();
+
+ // determine delimiter start and end index
+ final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);
+ final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);
+ final int keyLength = actualMapKey.length();
+
+ // extract sub-tree map
+ if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {
+ final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);
+ final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);
+ final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);
+ if (existingThresholdMap == null) {
+ Map<String, String> newThresholdMap = new LinkedHashMap<>();
+ newThresholdMap.put(subMapKey, actualMapValue);
+ subTreeMap.put(thresholdKey, newThresholdMap);
+ } else {
+ existingThresholdMap.put(subMapKey, actualMapValue);
+ }
+
+ }
+ }
+
+ return subTreeMap;
+
+ }
+
+
+ /**
+ * Provides a view of underlying map that filters out entries with keys starting with give prefix
+ *
+ * @param actualMap Target map that needs to be filtered
+ * @param keyNamePrefix key prefix
+ *
+ * @return a view of actual map which only show entries which have give prefix
+ */
+ public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,
+ final String keyNamePrefix) {
+ return Maps.filterKeys(actualMap,
+ new Predicate<String>() {
+ @Override
+ public boolean apply(@Nullable String key) {
+ return key != null && key.startsWith(keyNamePrefix);
+ }
+ });
+ }
+
+
+ /**
+ * Creates Quartz Scheduler
+ *
+ * @param pollingIntervalMS polling interval
+ * @param stdSchedulerFactory Quartz standard schedule factory instance
+ * @param quartzPublisherPropertiesFileName quartz properties file name
+ * @param jobDataMap job Data map
+ * @param quartzJobClass Quartz Job Class
+ * @param quartzJobName Quartz Job Name
+ * @param quartzTriggerName Quartz Trigger name
+ *
+ * @param <T> An implementation of Quartz {@link Job} interface
+ * @return Configured Quartz Scheduler
+ *
+ * @throws SchedulerException exception if unable to create to Quartz Scheduler
+ */
+ public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,
+ final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,
+ final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,
+ final String quartzTriggerName) throws SchedulerException {
+
+ // Initialize a new Quartz Standard scheduler
+ LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",
+ quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);
+ final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(
+ quartzPublisherPropertiesFileName, new Properties());
+ stdSchedulerFactory.initialize(quartzProperties);
+ final Scheduler scheduler = stdSchedulerFactory.getScheduler();
+
+ // Create a new job detail
+ final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,
+ AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();
+
+ // Create a new scheduling builder
+ final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
+ .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule
+ .repeatForever(); // repeats while worker is running
+
+ // Create a trigger for the TCA Publisher Job
+ final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()
+ .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)
+ .startNow() // job starts right away
+ .withSchedule(simpleScheduleBuilder).build();
+
+ scheduler.scheduleJob(jobDetail, simpleTrigger);
+ LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());
+ return scheduler;
+ }
+
+
+ /**
+ * Does A&AI Enrichment for VM
+ *
+ * @param tcavesResponse Outgoing alert object
+ * @param aaiEnrichmentClient A&AI Enrichment client
+ * @param aaiVMEnrichmentAPIPath A&AI VM Enrichment API Path
+ * @param alertString alert String
+ * @param vmSourceName vm source name
+ */
+ public static void doAAIVMEnrichment(final TCAVESResponse tcavesResponse,
+ final AAIEnrichmentClient aaiEnrichmentClient,
+ final String aaiVMEnrichmentAPIPath,
+ final String alertString,
+ final String vmSourceName) {
+
+ final String filterString = "vserver-name:EQUALS:" + vmSourceName;
+ final ImmutableMap<String, String> queryParams = ImmutableMap.of(
+ "search-node-type", "vserver", "filter", filterString);
+
+ // fetch vm object resource Link from A&AI
+ final String vmAAIResourceLinkDetails = aaiEnrichmentClient.getEnrichmentDetails(
+ aaiVMEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());
+ final String vmObjectResourceLink = getVMObjectResourceLink(vmAAIResourceLinkDetails);
+
+ if (vmObjectResourceLink == null) {
+ LOG.warn("No A&AI Enrichment possible for alert message: {}.VM Object resource Link cannot be " +
+ "determined for vmSourceName: {}.", alertString, vmSourceName);
+ } else {
+
+ LOG.debug("Fetching VM A&AI Enrichment Details for VM Source Name: {}, Object resource Link: {}",
+ vmSourceName, vmObjectResourceLink);
+
+ // fetch vm A&AI Enrichment
+ final String vmEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(
+ vmObjectResourceLink, Collections.<String, String>emptyMap(), createAAIEnrichmentHeaders());
+
+ // enrich AAI
+ enrichAAI(tcavesResponse.getAai(), vmEnrichmentDetails, alertString,
+ AnalyticsConstants.AAI_VSERVER_KEY_PREFIX);
+ }
+
+
+ }
+
+
+ /**
+ * Does A&AI Enrichment for VNF
+ *
+ * @param tcavesResponse Outgoing alert object
+ * @param aaiEnrichmentClient A&AI Enrichment client
+ * @param aaiVNFEnrichmentAPIPath A&AI VNF Enrichment API Path
+ * @param alertString alert String
+ * @param vnfSourceName vnf source name
+ */
+ public static void doAAIVNFEnrichment(final TCAVESResponse tcavesResponse,
+ final AAIEnrichmentClient aaiEnrichmentClient,
+ final String aaiVNFEnrichmentAPIPath,
+ final String alertString,
+ final String vnfSourceName) {
+ final ImmutableMap<String, String> queryParams = ImmutableMap.of("vnf-name", vnfSourceName);
+
+ // fetch vnf A&AI Enrichment
+ final String vnfEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(
+ aaiVNFEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());
+
+ // enrich alert AAI
+ enrichAAI(tcavesResponse.getAai(), vnfEnrichmentDetails, alertString, AnalyticsConstants.AAI_VNF_KEY_PREFIX);
+ }
+
+ /**
+ * Fetches VM Object Resource Link from A&AI Resource Link Json
+ *
+ * @param vmAAIResourceLinkDetails VM Object Resource Link from A&AI Resource Link Json
+ *
+ * @return object resource link String
+ */
+ private static String getVMObjectResourceLink(final String vmAAIResourceLinkDetails) {
+ if (StringUtils.isNotBlank(vmAAIResourceLinkDetails)) {
+ try {
+ final JsonNode jsonNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(vmAAIResourceLinkDetails);
+ final JsonNode resourceLinkJsonNode = jsonNode.findPath("resource-link");
+ if (!resourceLinkJsonNode.isMissingNode()) {
+ return resourceLinkJsonNode.asText();
+ }
+ } catch (IOException e) {
+ LOG.warn("Unable to determine VM Object link inside AAI Resource Link Response JSON: {}. Exception: {}",
+ vmAAIResourceLinkDetails, e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Creates Http Headers for A&AI Enrichment client
+ *
+ * @return Http Headers Map for A&AI Enrichment client
+ */
+ private static Map<String, String> createAAIEnrichmentHeaders() {
+ final Map<String, String> aaiEnrichmentHeaders = new LinkedHashMap<>();
+ final String transactionId = Long.toString(new Date().getTime());
+ aaiEnrichmentHeaders.put("X-FromAppId", "dcae-analytics-tca");
+ aaiEnrichmentHeaders.put("X-TransactionId", transactionId);
+ aaiEnrichmentHeaders.put("Accept", "application/json");
+ aaiEnrichmentHeaders.put("Real-Time", "true");
+ aaiEnrichmentHeaders.put("Content-Type", "application/json");
+ return aaiEnrichmentHeaders;
+ }
+
+
+ /**
+ * Populates A&AI details retrieved from A&AI Enrichment API into Alerts A&AI Object
+ *
+ * @param preEnrichmentAAI A&AI Alert object which needs to be populated with A&AI Enrichment Details
+ * @param aaiEnrichmentDetails A&AI Enrichment API fetched JSON String
+ * @param alertString Alert String
+ * @param keyPrefix Key prefix that needs to be added to each fetched A&AI Enrichment record
+ */
+ private static void enrichAAI(final AAI preEnrichmentAAI, final String aaiEnrichmentDetails,
+ final String alertString, final String keyPrefix) {
+
+ if (aaiEnrichmentDetails == null) {
+ LOG.warn("No A&AI Enrichment possible for AAI: {}. A&AI Enrichment details are absent." +
+ "Skipping Enrichment for alert message:{}", preEnrichmentAAI, alertString);
+
+ } else {
+
+ final AAI enrichmentDetailsAAI = getEnrichmentDetailsAAI(aaiEnrichmentDetails);
+
+ if (enrichmentDetailsAAI != null) {
+ final Set<Map.Entry<String, Object>> enrichedAAIEntrySet =
+ enrichmentDetailsAAI.getDynamicProperties().entrySet();
+ final Map<String, Object> preEnrichmentAAIDynamicProperties = preEnrichmentAAI.getDynamicProperties();
+
+ // populate A&AI Enrichment details and add prefix to key
+ for (Map.Entry<String, Object> enrichedAAIEntry : enrichedAAIEntrySet) {
+ preEnrichmentAAIDynamicProperties.put(keyPrefix + enrichedAAIEntry.getKey(),
+ enrichedAAIEntry.getValue());
+ }
+
+ LOG.debug("A&AI Enrichment was completed successfully for alert message: {}. Enriched AAI: {}",
+ alertString, preEnrichmentAAI);
+ } else {
+ LOG.warn("No A&AI Enrichment possible for AAI: {}. Invalid A&AI Response: {}." +
+ "Skipping Enrichment for alert message: {}",
+ preEnrichmentAAI, aaiEnrichmentDetails, alertString);
+ }
+ }
+
+ }
+
+ /**
+ * Creates a new A&AI object with only top level A&AI Enrichment details
+ *
+ * @param aaiEnrichmentDetails A&AI Enrichment details
+ *
+ * @return new A&AI with only top level A&AI Enrichment details
+ */
+ private static AAI getEnrichmentDetailsAAI(final String aaiEnrichmentDetails) {
+ try {
+ final JsonNode rootNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(aaiEnrichmentDetails);
+ final Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();
+ while (fieldsIterator.hasNext()) {
+ final Map.Entry<String, JsonNode> fieldEntry = fieldsIterator.next();
+ final JsonNode jsonNode = fieldEntry.getValue();
+ // remove all arrays, objects from A&AI Enrichment Json
+ if (jsonNode.isPojo() || jsonNode.isObject() || jsonNode.isArray()) {
+ fieldsIterator.remove();
+ }
+ }
+ return ANALYTICS_MODEL_OBJECT_MAPPER.treeToValue(rootNode, AAI.class);
+ } catch (IOException e) {
+ LOG.error("Failed to Parse AAI Enrichment Details from JSON: {}, Exception: {}.", aaiEnrichmentDetails, e);
+ }
+ return null;
+ }
+
+}
|