diff options
Diffstat (limited to 'dcae-analytics-cdap-common')
12 files changed, 577 insertions, 163 deletions
diff --git a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/CDAPComponentsConstants.java b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/CDAPComponentsConstants.java index d00bc1b..3121c0e 100644 --- a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/CDAPComponentsConstants.java +++ b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/CDAPComponentsConstants.java @@ -158,6 +158,22 @@ public abstract class CDAPComponentsConstants { /** + * Fixed Name for TCA VES Alerts Abatement Flowlet + */ + public static final String TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET = "TCAVESAlertsAbatementFlowlet"; + + /** + * Fixed Description for TCA VES Alerts Abatement Flowlet + */ + public static final String TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET = + "Determines if abatement event needs to be posted to downstream systems"; + + /** + * Fixed Name for TCA VES Alerts Abatement Flowlet output + */ + public static final String TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT = "TCAVESAlertsAbatementFlowletOutput"; + + /** * Fixed Name for TCA VES Alerts Sink Flowlet */ public static final String TCA_FIXED_VES_ALERTS_SINK_NAME_FLOWLET = "TCAVESAlertsSinkFlowlet"; @@ -189,9 +205,20 @@ public abstract class CDAPComponentsConstants { /** * Fixed Description for TCA VES Alerts table which contains alerts that can be send to downstream systems */ - public static final String TCA_DEFAULT_VES_ALERTS_DESCRIPTION_TABLE = + public static final String TCA_FIXED_VES_ALERTS_DESCRIPTION_TABLE = "Stores alert messages that need to be DMaaP"; + /** + * Default Name for TCA Alerts abatement table which contains information to send out abated alerts + */ + public static final String TCA_DEFAULT_ALERTS_ABATEMENT_NAME_TABLE = "TCAAlertsAbatementTable"; + + /** + * Fixed Description for TCA Alerts abatement table which contains information to determine abatement alerts + */ + public static final String TCA_FIXED_ALERTS_ABATEMENT_DESCRIPTION_TABLE = + "Stores information to determine creation of abatement alerts"; + private CDAPComponentsConstants() { diff --git a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/domain/tca/ThresholdCalculatorOutput.java b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/domain/tca/ThresholdCalculatorOutput.java new file mode 100644 index 0000000..786faca --- /dev/null +++ b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/domain/tca/ThresholdCalculatorOutput.java @@ -0,0 +1,94 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.common.domain.tca; + +import com.google.common.base.Objects; + +import java.io.Serializable; + +/** + * Simple POJO emitted by threshold calculator + * + * @author rs153v (Rajiv Singla) . Creation Date: 9/11/2017. + */ +public class ThresholdCalculatorOutput implements Serializable { + + private static final long serialVersionUID = 1L; + + private String cefMessage; + private String tcaPolicy; + private String violatedMetricsPerEventName; + private String alertMessage; + + public ThresholdCalculatorOutput() { + // no arg constructor + } + + public ThresholdCalculatorOutput(String cefMessage, String tcaPolicy, + String violatedMetricsPerEventName, String alertMessage) { + this.cefMessage = cefMessage; + this.tcaPolicy = tcaPolicy; + this.violatedMetricsPerEventName = violatedMetricsPerEventName; + this.alertMessage = alertMessage; + } + + public String getCefMessage() { + return cefMessage; + } + + public void setCefMessage(String cefMessage) { + this.cefMessage = cefMessage; + } + + public String getTcaPolicy() { + return tcaPolicy; + } + + public void setTcaPolicy(String tcaPolicy) { + this.tcaPolicy = tcaPolicy; + } + + public String getViolatedMetricsPerEventName() { + return violatedMetricsPerEventName; + } + + public void setViolatedMetricsPerEventName(String violatedMetricsPerEventName) { + this.violatedMetricsPerEventName = violatedMetricsPerEventName; + } + + public String getAlertMessage() { + return alertMessage; + } + + public void setAlertMessage(String alertMessage) { + this.alertMessage = alertMessage; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("cefMessage", cefMessage) + .add("tcaPolicy", tcaPolicy) + .add("violatedMetricsPerEventName", violatedMetricsPerEventName) + .add("alertMessage", alertMessage) + .toString(); + } +} diff --git a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementEntity.java b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementEntity.java new file mode 100644 index 0000000..6f71801 --- /dev/null +++ b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementEntity.java @@ -0,0 +1,157 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca; + +import com.google.common.base.Objects; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +/** + * TCA Alerts Abatement Entity is used to persist information to determine if abatement event need to sent to downstream + * systems + * + * @author rs153v (Rajiv Singla) . Creation Date: 9/11/2017. + */ +public class TCAAlertsAbatementEntity implements Writable, Serializable { + + private static final long serialVersionUID = 1L; + + private long creationTS; + private String requestId; + // Kept as string to avoid null checks + private String abatementSentTS; + + /** + * No Arg constructor required for Jackson Json Serialization / Deserialization + */ + public TCAAlertsAbatementEntity() { + // required no arg constructor + } + + /** + * Creates TCA Alerts Abatement Entity to persist information to determine if abatement alerts need to be posted + * + * @param creationTS record creation time + * @param requestId request ID of generated alert + * @param abatementSentTS time when abatement was sent out for that alert if any + */ + public TCAAlertsAbatementEntity(long creationTS, String requestId, String abatementSentTS) { + this.creationTS = creationTS; + this.requestId = requestId; + this.abatementSentTS = abatementSentTS; + } + + /** + * Timestamp when record was created + * + * @return timestamp when record was created + */ + public long getCreationTS() { + return creationTS; + } + + /** + * Set value for timestamp when record was created + * + * @param creationTS new value for timestamp when record was created + */ + public void setCreationTS(long creationTS) { + this.creationTS = creationTS; + } + + /** + * Request Id of ONSET alert which was sent + * + * @return request Id of ONSET alert which was sent + */ + public String getRequestId() { + return requestId; + } + + /** + * Set Request Id of ONSET alert + * + * @param requestId set new value for ONSET alert request id + */ + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + + /** + * Get abatement Sent Timestamp + * + * @return get abatement alert sent timestamp + */ + public String getAbatementSentTS() { + return abatementSentTS; + } + + /** + * Set timestamp when abatement alert is sent + * + * @param abatementSentTS sent new value for timestamp when abatement alert is sent + */ + public void setAbatementSentTS(String abatementSentTS) { + this.abatementSentTS = abatementSentTS; + } + + /** + * Write entity to Table + * + * @param dataOutput data output + * @throws IOException io exception + */ + @Override + public void write(DataOutput dataOutput) throws IOException { + WritableUtils.writeVLong(dataOutput, creationTS); + WritableUtils.writeString(dataOutput, requestId); + WritableUtils.writeString(dataOutput, abatementSentTS); + } + + /** + * Read entity from table + * + * @param dataInput data input + * @throws IOException io exception + */ + @Override + public void readFields(DataInput dataInput) throws IOException { + creationTS = WritableUtils.readVLong(dataInput); + requestId = WritableUtils.readString(dataInput); + abatementSentTS = WritableUtils.readString(dataInput); + } + + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("creationTS", creationTS) + .add("requestId", requestId) + .add("abatementSentTS", abatementSentTS) + .toString(); + } +} diff --git a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java new file mode 100644 index 0000000..318fca8 --- /dev/null +++ b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java @@ -0,0 +1,127 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca; + +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.data.schema.UnsupportedTypeException; +import co.cask.cdap.api.dataset.DatasetProperties; +import co.cask.cdap.api.dataset.lib.IndexedTable; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils; +import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.List; + +import static org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME; + +/** + * Utility methods to persist TCA Alerts Abatement information + * + * @author rs153v (Rajiv Singla) . Creation Date: 9/11/2017. + */ +public abstract class TCAAlertsAbatementPersister { + + private static final Logger LOG = LoggerFactory.getLogger(TCAAlertsAbatementPersister.class); + + private static final Joiner KEY_JOINER = Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER); + + private TCAAlertsAbatementPersister() { + // private constructor + } + + /** + * Creates {@link DatasetProperties} for Alerts Table + * + * @param timeToLiveSeconds alerts table Time to Live + * + * @return Alerts Abatement table properties + */ + public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) { + try { + return ObjectMappedTableProperties.builder() + .setType(TCAAlertsAbatementEntity.class) + .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME) + .setRowKeyExploreType(Schema.Type.STRING) + .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds) + .setDescription(CDAPComponentsConstants.TCA_FIXED_ALERTS_ABATEMENT_DESCRIPTION_TABLE) + .build(); + } catch (UnsupportedTypeException e) { + final String errorMessage = "Unable to convert TCAAlertsAbatementEntity class to Schema"; + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + + public static void persist(final EventListener eventListener, + final MetricsPerEventName violatedMetricsPerEventName, + final TCAVESResponse tcavesResponse, + final String abatementTS, + final ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable) { + final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName); + + final long currentTimestamp = new Date().getTime(); + final String requestID = tcavesResponse.getRequestID(); + final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = new TCAAlertsAbatementEntity(currentTimestamp, + requestID, abatementTS); + tcaAlertsAbatementTable.write(abatementTableKey, tcaAlertsAbatementEntity); + + LOG.debug("Persisted AlertsAbatementEntity: {} with Key: {}", tcaAlertsAbatementEntity, abatementTableKey); + + } + + public static TCAAlertsAbatementEntity lookUpByKey(final EventListener eventListener, + final MetricsPerEventName violatedMetricsPerEventName, + final ObjectMappedTable<TCAAlertsAbatementEntity> + tcaAlertsAbatementTable) { + final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName); + return tcaAlertsAbatementTable.read(abatementTableKey); + } + + + public static String createKey(final EventListener eventListener, + final MetricsPerEventName violatedMetricsPerEventName) { + // no null check required as all are required fields + final String eventName = violatedMetricsPerEventName.getEventName(); + final String sourceName = eventListener.getEvent().getCommonEventHeader().getSourceName(); + final String reportingEntityName = eventListener.getEvent().getCommonEventHeader().getReportingEntityName(); + // violated threshold will always be present + final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0); + final String closedLoopControlName = violatedThreshold.getClosedLoopControlName(); + final String fieldPath = violatedThreshold.getFieldPath(); + + final List<String> abatementKeyList = + ImmutableList.of(eventName, sourceName, reportingEntityName, closedLoopControlName, fieldPath); + + return KEY_JOINER.join(abatementKeyList); + } + +} diff --git a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusEntity.java b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusEntity.java index c529e55..d6b3099 100644 --- a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusEntity.java +++ b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusEntity.java @@ -22,12 +22,11 @@ package org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; -import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor; import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyFunctionalRoleFilter; +import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter; import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor; import java.io.DataInput; @@ -49,7 +48,7 @@ public class TCAMessageStatusEntity implements Writable, Serializable { private String messageType; private String vesMessage; private String domain; - private String functionalRole; + private String eventName; private String thresholdPath; private String thresholdSeverity; private String thresholdDirection; @@ -58,8 +57,8 @@ public class TCAMessageStatusEntity implements Writable, Serializable { private String jsonProcessorMessage; private String domainFilterStatus; private String domainFilterMessage; - private String functionalRoleFilterStatus; - private String functionalRoleFilterMessage; + private String eventNameFilterStatus; + private String eventNameFilterMessage; private String thresholdCalculatorStatus; private String thresholdCalculatorMessage; private String alertMessage; @@ -79,11 +78,11 @@ public class TCAMessageStatusEntity implements Writable, Serializable { * @param messageType {@link TCACalculatorMessageType} * @param vesMessage incoming VES message from collector * @param domain VES message domain if present - * @param functionalRole VES message functional role if present + * @param eventName VES message functional role if present */ public TCAMessageStatusEntity(final long creationTS, final int instanceId, final String messageType, - final String vesMessage, final String domain, final String functionalRole) { - this(creationTS, instanceId, messageType, vesMessage, domain, functionalRole, null, null, null, null, + final String vesMessage, final String domain, final String eventName) { + this(creationTS, instanceId, messageType, vesMessage, domain, eventName, null, null, null, null, null, null, null, null, null, null, null, null, null); } @@ -96,7 +95,7 @@ public class TCAMessageStatusEntity implements Writable, Serializable { * @param messageType {@link TCACalculatorMessageType} * @param vesMessage incoming VES message from collector * @param domain VES message domain if present - * @param functionalRole VES message functional role if present + * @param eventName VES message event name if present * @param thresholdPath Violated threshold path * @param thresholdSeverity Violated threshold Severity if any * @param thresholdDirection Violated threshold Direction if any @@ -105,23 +104,19 @@ public class TCAMessageStatusEntity implements Writable, Serializable { * @param jsonProcessorMessage {@link TCACEFJsonProcessor} message * @param domainFilterStatus {@link TCACEFPolicyDomainFilter} status * @param domainFilterMessage {@link TCACEFPolicyDomainFilter} message - * @param functionalRoleFilterStatus - * {@link TCACEFPolicyFunctionalRoleFilter} status - * @param functionalRoleFilterMessage - * {@link TCACEFPolicyFunctionalRoleFilter} message - * @param thresholdCalculatorStatus - * {@link TCACEFPolicyThresholdsProcessor} status - * @param thresholdCalculatorMessage - * {@link TCACEFPolicyThresholdsProcessor} message + * @param eventNameFilterStatus {@link TCACEFPolicyEventNameFilter} status + * @param eventNameFilterMessage {@link TCACEFPolicyEventNameFilter} message + * @param thresholdCalculatorStatus {@link TCACEFPolicyThresholdsProcessor} status + * @param thresholdCalculatorMessage {@link TCACEFPolicyThresholdsProcessor} message * @param alertMessage alert message that will be sent out in case of threshold violation */ public TCAMessageStatusEntity(long creationTS, int instanceId, String messageType, String vesMessage, - String domain, String functionalRole, + String domain, String eventName, String thresholdPath, String thresholdSeverity, String thresholdDirection, Long thresholdValue, String jsonProcessorStatus, String jsonProcessorMessage, String domainFilterStatus, String domainFilterMessage, - String functionalRoleFilterStatus, String functionalRoleFilterMessage, + String eventNameFilterStatus, String eventNameFilterMessage, String thresholdCalculatorStatus, String thresholdCalculatorMessage, String alertMessage) { this.creationTS = creationTS; @@ -129,7 +124,7 @@ public class TCAMessageStatusEntity implements Writable, Serializable { this.messageType = messageType; this.vesMessage = vesMessage; this.domain = domain; - this.functionalRole = functionalRole; + this.eventName = eventName; this.thresholdPath = thresholdPath; this.thresholdSeverity = thresholdSeverity; this.thresholdDirection = thresholdDirection; @@ -138,8 +133,8 @@ public class TCAMessageStatusEntity implements Writable, Serializable { this.jsonProcessorMessage = jsonProcessorMessage; this.domainFilterStatus = domainFilterStatus; this.domainFilterMessage = domainFilterMessage; - this.functionalRoleFilterStatus = functionalRoleFilterStatus; - this.functionalRoleFilterMessage = functionalRoleFilterMessage; + this.eventNameFilterStatus = eventNameFilterStatus; + this.eventNameFilterMessage = eventNameFilterMessage; this.thresholdCalculatorStatus = thresholdCalculatorStatus; this.thresholdCalculatorMessage = thresholdCalculatorMessage; this.alertMessage = alertMessage; @@ -237,21 +232,21 @@ public class TCAMessageStatusEntity implements Writable, Serializable { } /** - * Provides VES Message Functional Role + * Provides VES Message Event Name * - * @return ves message functional role + * @return ves message Event Name */ - public String getFunctionalRole() { - return functionalRole; + public String getEventName() { + return eventName; } /** * Sets VES Message Functional Role * - * @param functionalRole ves message Functional Role + * @param eventName ves message Functional Role */ - public void setFunctionalRole(String functionalRole) { - this.functionalRole = functionalRole; + public void setEventName(String eventName) { + this.eventName = eventName; } /** @@ -273,7 +268,7 @@ public class TCAMessageStatusEntity implements Writable, Serializable { } /** - * Violated threshold {@link EventSeverity} + * Violated threshold Event Severity * * @return event severity */ @@ -398,35 +393,35 @@ public class TCAMessageStatusEntity implements Writable, Serializable { this.domainFilterMessage = domainFilterMessage; } - public String getFunctionalRoleFilterStatus() { - return functionalRoleFilterStatus; + public String getEventNameFilterStatus() { + return eventNameFilterStatus; } /** - * Provides {@link TCACEFPolicyFunctionalRoleFilter} status + * Provides {@link TCACEFPolicyEventNameFilter} status * - * @param functionalRoleFilterStatus functional Role filter status + * @param eventNameFilterStatus functional Role filter status */ - public void setFunctionalRoleFilterStatus(String functionalRoleFilterStatus) { - this.functionalRoleFilterStatus = functionalRoleFilterStatus; + public void setEventNameFilterStatus(String eventNameFilterStatus) { + this.eventNameFilterStatus = eventNameFilterStatus; } /** - * Provides {@link TCACEFPolicyFunctionalRoleFilter} message + * Provides {@link TCACEFPolicyEventNameFilter} message * * @return functional role filter message */ - public String getFunctionalRoleFilterMessage() { - return functionalRoleFilterMessage; + public String getEventNameFilterMessage() { + return eventNameFilterMessage; } /** * Sets Functional Role filter message * - * @param functionalRoleFilterMessage functional role filter message + * @param eventNameFilterMessage functional role filter message */ - public void setFunctionalRoleFilterMessage(String functionalRoleFilterMessage) { - this.functionalRoleFilterMessage = functionalRoleFilterMessage; + public void setEventNameFilterMessage(String eventNameFilterMessage) { + this.eventNameFilterMessage = eventNameFilterMessage; } /** @@ -498,7 +493,7 @@ public class TCAMessageStatusEntity implements Writable, Serializable { WritableUtils.writeString(dataOutput, vesMessage); WritableUtils.writeString(dataOutput, domain); - WritableUtils.writeString(dataOutput, functionalRole); + WritableUtils.writeString(dataOutput, eventName); WritableUtils.writeString(dataOutput, thresholdPath); WritableUtils.writeString(dataOutput, thresholdSeverity); @@ -509,8 +504,8 @@ public class TCAMessageStatusEntity implements Writable, Serializable { WritableUtils.writeString(dataOutput, jsonProcessorMessage); WritableUtils.writeString(dataOutput, domainFilterStatus); WritableUtils.writeString(dataOutput, domainFilterMessage); - WritableUtils.writeString(dataOutput, functionalRoleFilterStatus); - WritableUtils.writeString(dataOutput, functionalRoleFilterMessage); + WritableUtils.writeString(dataOutput, eventNameFilterStatus); + WritableUtils.writeString(dataOutput, eventNameFilterMessage); WritableUtils.writeString(dataOutput, thresholdCalculatorStatus); WritableUtils.writeString(dataOutput, thresholdCalculatorMessage); @@ -532,7 +527,7 @@ public class TCAMessageStatusEntity implements Writable, Serializable { vesMessage = WritableUtils.readString(dataInput); domain = WritableUtils.readString(dataInput); - functionalRole = WritableUtils.readString(dataInput); + eventName = WritableUtils.readString(dataInput); thresholdPath = WritableUtils.readString(dataInput); thresholdSeverity = WritableUtils.readString(dataInput); @@ -543,8 +538,8 @@ public class TCAMessageStatusEntity implements Writable, Serializable { jsonProcessorMessage = WritableUtils.readString(dataInput); domainFilterStatus = WritableUtils.readString(dataInput); domainFilterMessage = WritableUtils.readString(dataInput); - functionalRoleFilterStatus = WritableUtils.readString(dataInput); - functionalRoleFilterMessage = WritableUtils.readString(dataInput); + eventNameFilterStatus = WritableUtils.readString(dataInput); + eventNameFilterMessage = WritableUtils.readString(dataInput); thresholdCalculatorStatus = WritableUtils.readString(dataInput); thresholdCalculatorMessage = WritableUtils.readString(dataInput); diff --git a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersister.java b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersister.java index 0f4c539..cb63ffb 100644 --- a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersister.java +++ b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersister.java @@ -34,11 +34,11 @@ import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeEx import org.openecomp.dcae.apod.analytics.common.service.processor.MessageProcessor; import org.openecomp.dcae.apod.analytics.common.service.processor.ProcessorContext; import org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerFunctionalRole; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor; import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter; -import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyFunctionalRoleFilter; +import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter; import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor; import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils; @@ -102,12 +102,12 @@ public abstract class TCAMessageStatusPersister { final String vesMessage = StringEscapeUtils.unescapeJson(processorContext.getMessage()); // Find Functional Role and domain - final Pair<String, String> domainAndFunctionalRole = TCAUtils.getDomainAndFunctionalRole(processorContext); - final String domain = domainAndFunctionalRole.getLeft(); - final String functionalRole = domainAndFunctionalRole.getRight(); + final Pair<String, String> domainAndEventName = TCAUtils.getDomainAndEventName(processorContext); + final String domain = domainAndEventName.getLeft(); + final String eventName = domainAndEventName.getRight(); final TCAMessageStatusEntity tcaMessageStatusEntity = new TCAMessageStatusEntity(currentTS, - instanceId, calculatorMessageType.name(), vesMessage, domain, functionalRole); + instanceId, calculatorMessageType.name(), vesMessage, domain, eventName); // add threshold violation fields addViolatedThreshold(tcaMessageStatusEntity, processorContext); @@ -161,13 +161,13 @@ public abstract class TCAMessageStatusPersister { public static TCAMessageStatusEntity addViolatedThreshold(final TCAMessageStatusEntity tcaMessageStatusEntity, final TCACEFProcessorContext processorContext) { - final MetricsPerFunctionalRole metricsPerFunctionalRole = processorContext.getMetricsPerFunctionalRole(); + final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName(); - if (metricsPerFunctionalRole != null - && metricsPerFunctionalRole.getThresholds() != null - && metricsPerFunctionalRole.getThresholds().get(0) != null) { + if (metricsPerEventName != null + && metricsPerEventName.getThresholds() != null + && metricsPerEventName.getThresholds().get(0) != null) { - final Threshold threshold = metricsPerFunctionalRole.getThresholds().get(0); + final Threshold threshold = metricsPerEventName.getThresholds().get(0); tcaMessageStatusEntity.setThresholdPath(threshold.getFieldPath()); tcaMessageStatusEntity.setThresholdSeverity(threshold.getSeverity().name()); tcaMessageStatusEntity.setThresholdDirection(threshold.getDirection().name()); @@ -209,9 +209,9 @@ public abstract class TCAMessageStatusPersister { tcaMessageStatusEntity.setDomainFilterMessage(processingMessage); } - if (messageProcessor.getClass().equals(TCACEFPolicyFunctionalRoleFilter.class)) { - tcaMessageStatusEntity.setFunctionalRoleFilterStatus(processingState); - tcaMessageStatusEntity.setFunctionalRoleFilterMessage(processingMessage); + if (messageProcessor.getClass().equals(TCACEFPolicyEventNameFilter.class)) { + tcaMessageStatusEntity.setEventNameFilterStatus(processingState); + tcaMessageStatusEntity.setEventNameFilterMessage(processingMessage); } if (messageProcessor.getClass().equals(TCACEFPolicyThresholdsProcessor.class)) { diff --git a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAVESAlertsPersister.java b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAVESAlertsPersister.java index 18293c7..36e3496 100644 --- a/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAVESAlertsPersister.java +++ b/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAVESAlertsPersister.java @@ -79,7 +79,7 @@ public abstract class TCAVESAlertsPersister { .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME) .setRowKeyExploreType(Schema.Type.STRING) .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds) - .setDescription(CDAPComponentsConstants.TCA_DEFAULT_VES_ALERTS_DESCRIPTION_TABLE) + .setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_DESCRIPTION_TABLE) .build(); } catch (UnsupportedTypeException e) { final String errorMessage = "Unable to convert TCAVESAlertEntity class to Schema"; diff --git a/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/BaseAnalyticsCDAPCommonUnitTest.java b/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/BaseAnalyticsCDAPCommonUnitTest.java index 825cdf0..55a2198 100644 --- a/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/BaseAnalyticsCDAPCommonUnitTest.java +++ b/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/BaseAnalyticsCDAPCommonUnitTest.java @@ -35,8 +35,8 @@ import org.openecomp.dcae.apod.analytics.test.BaseDCAEAnalyticsUnitTest; */ public abstract class BaseAnalyticsCDAPCommonUnitTest extends BaseDCAEAnalyticsUnitTest { - protected static final String CEF_MESSAGE_FILE_LOCATION = "data/json/cef_message.json"; - protected static final String TCA_POLICY_FILE_LOCATION = "data/json/tca_policy.json"; + protected static final String CEF_MESSAGE_FILE_LOCATION = "data/json/cef/cef_message.json"; + protected static final String TCA_POLICY_FILE_LOCATION = "data/json/policy/tca_policy.json"; protected static final ObjectMapper ANALYTICS_MODEL_OBJECT_MAPPER = Suppliers.memoize(new AnalyticsModelObjectMapperSupplier()).get(); diff --git a/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java b/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java new file mode 100644 index 0000000..36d72fa --- /dev/null +++ b/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java @@ -0,0 +1,103 @@ +package org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca; + +import co.cask.cdap.api.dataset.DatasetProperties; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Test; +import org.openecomp.dcae.apod.analytics.cdap.common.BaseAnalyticsCDAPCommonUnitTest; +import org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils; +import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader; +import org.openecomp.dcae.apod.analytics.model.domain.cef.Event; +import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Author: rs153v (Rajiv Singla) . Creation Date: 9/13/2017. + */ +public class TCAAlertsAbatementPersisterTest extends BaseAnalyticsCDAPCommonUnitTest { + + private static final String EVENT_NAME = "testEventName"; + private static final String SOURCE_NAME = "testSourceName"; + private static final String REPORTING_ENTITY_NAME = "testReportingEntityName"; + private static final String THRESHOLD_CLOSED_LOOP_CONTROL_NAME = "testControlLoopName"; + private static final String THRESHOLD_FIELD_PATH = "testFieldPath"; + private static final String EXPECTED_LOOKUP_KEY = Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER).join( + ImmutableList.of(EVENT_NAME, SOURCE_NAME, REPORTING_ENTITY_NAME, + THRESHOLD_CLOSED_LOOP_CONTROL_NAME, THRESHOLD_FIELD_PATH)); + + private ObjectMappedTable<TCAAlertsAbatementEntity> alertsAbatementTable; + private EventListener eventListener; + private MetricsPerEventName violatedMetricsPerEventName; + private TCAVESResponse tcavesResponse; + private String abatementTS; + private Event event; + private CommonEventHeader commonEventHeader; + private Threshold violatedThreshold; + + @Before + public void before() throws Exception { + alertsAbatementTable = mock(ObjectMappedTable.class); + eventListener = mock(EventListener.class); + event = mock(Event.class); + commonEventHeader = mock(CommonEventHeader.class); + + when(eventListener.getEvent()).thenReturn(event); + when(event.getCommonEventHeader()).thenReturn(commonEventHeader); + when(commonEventHeader.getEventName()).thenReturn(EVENT_NAME); + when(commonEventHeader.getSourceName()).thenReturn(SOURCE_NAME); + when(commonEventHeader.getReportingEntityName()).thenReturn(REPORTING_ENTITY_NAME); + + violatedMetricsPerEventName = mock(MetricsPerEventName.class); + when(violatedMetricsPerEventName.getEventName()).thenReturn(EVENT_NAME); + violatedThreshold = mock(Threshold.class); + when(violatedMetricsPerEventName.getThresholds()).thenReturn(ImmutableList.of(violatedThreshold)); + when(violatedThreshold.getClosedLoopControlName()).thenReturn(THRESHOLD_CLOSED_LOOP_CONTROL_NAME); + when(violatedThreshold.getFieldPath()).thenReturn(THRESHOLD_FIELD_PATH); + tcavesResponse = mock(TCAVESResponse.class); + abatementTS = "1234"; + } + + @Test + public void testGetDatasetProperties() throws Exception { + final DatasetProperties datasetProperties = TCAAlertsAbatementPersister.getDatasetProperties(20000); + assertNotNull(datasetProperties); + } + + @Test + public void testPersist() throws Exception { + + TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, + abatementTS, alertsAbatementTable); + verify(alertsAbatementTable, times(1)).write(anyString(), + any(TCAAlertsAbatementEntity.class)); + + } + + @Test + public void testLookUpByKey() throws Exception { + TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName, alertsAbatementTable); + verify(alertsAbatementTable, times(1)).read(eq(EXPECTED_LOOKUP_KEY)); + } + + @Test + public void testCreateKey() throws Exception { + final String createdKey = TCAAlertsAbatementPersister.createKey(eventListener, violatedMetricsPerEventName); + assertEquals(createdKey, EXPECTED_LOOKUP_KEY); + + } + +} diff --git a/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersisterTest.java b/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersisterTest.java index a062b37..7dfd74d 100644 --- a/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersisterTest.java +++ b/dcae-analytics-cdap-common/src/test/java/org/openecomp/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersisterTest.java @@ -28,11 +28,12 @@ import org.junit.Before; import org.junit.Test; import org.openecomp.dcae.apod.analytics.cdap.common.BaseAnalyticsCDAPCommonUnitTest; import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader; +import org.openecomp.dcae.apod.analytics.model.domain.cef.Domain; import org.openecomp.dcae.apod.analytics.model.domain.cef.Event; import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener; import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction; -import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerFunctionalRole; +import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold; import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; @@ -51,8 +52,8 @@ import static org.mockito.Mockito.when; public class TCAMessageStatusPersisterTest extends BaseAnalyticsCDAPCommonUnitTest { private static final int TEST_INSTANCE_ID = 0; - private static final String TEST_DOMAIN = "TEST_DOMAIN"; - private static final String TEST_FUNCTIONAL_ROLE = "TEST_FUNCIONAL_ROLE"; + private static final Domain TEST_DOMAIN = Domain.other; + private static final String TEST_EVENT_NAME = "TEST_EVENT_NAME"; private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable; private TCACEFProcessorContext processorContext; @@ -73,7 +74,7 @@ public class TCAMessageStatusPersisterTest extends BaseAnalyticsCDAPCommonUnitTe when(processorContext.getCEFEventListener()).thenReturn(eventListener); when(eventListener.getEvent()).thenReturn(event); when(event.getCommonEventHeader()).thenReturn(commonEventHeader); - when(commonEventHeader.getFunctionalRole()).thenReturn(TEST_FUNCTIONAL_ROLE); + when(commonEventHeader.getEventName()).thenReturn(TEST_EVENT_NAME); when(commonEventHeader.getDomain()).thenReturn(TEST_DOMAIN); } @@ -88,10 +89,10 @@ public class TCAMessageStatusPersisterTest extends BaseAnalyticsCDAPCommonUnitTe @Test public void testPersistWithNonCompliantMessage() throws Exception { - final MetricsPerFunctionalRole metricsPerFunctionalRole = mock(MetricsPerFunctionalRole.class); + final MetricsPerEventName metricsPerEventName = mock(MetricsPerEventName.class); final Threshold threshold = mock(Threshold.class); - when(processorContext.getMetricsPerFunctionalRole()).thenReturn(metricsPerFunctionalRole); - when((metricsPerFunctionalRole.getThresholds())).thenReturn(ImmutableList.of(threshold)); + when(processorContext.getMetricsPerEventName()).thenReturn(metricsPerEventName); + when((metricsPerEventName.getThresholds())).thenReturn(ImmutableList.of(threshold)); when(threshold.getDirection()).thenReturn(Direction.GREATER); when(threshold.getSeverity()).thenReturn(EventSeverity.CRITICAL); TCAMessageStatusPersister.persist( diff --git a/dcae-analytics-cdap-common/src/test/resources/data/json/cef_message.json b/dcae-analytics-cdap-common/src/test/resources/data/json/cef_message.json deleted file mode 100644 index 52cf53b..0000000 --- a/dcae-analytics-cdap-common/src/test/resources/data/json/cef_message.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "event": { - "measurementsForVfScalingFields": { - "measurementInterval": 10, - "measurementsForVfScalingVersion": 1.1, - "vNicUsageArray": [ - { - "bytesIn": 6086, - "multicastPacketsIn": 0, - "multicastPacketsOut": 0, - "unicastPacketsIn": 0, - "broadcastPacketsOut": 0, - "packetsOut": 42, - "bytesOut": 7156, - "packetsIn": 93, - "broadcastPacketsIn": 0, - "vNicIdentifier": "eth0", - "unicastPacketsOut": 0 - } - ] - }, - "commonEventHeader": { - "reportingEntityName": "vpp-test", - "startEpochMicrosec": 1477070210290442, - "eventId": "375", - "lastEpochMicrosec": 1477070220290442, - "priority": "Normal", - "sequence": 375, - "sourceName": "Dummy VM name - No Metadata available", - "domain": "measurementsForVfScaling", - "functionalRole": "vFirewall", - "reportingEntityId": "No UUID available", - "version": 1.1, - "sourceId": "Dummy VM UUID - No Metadata available" - } - } -} diff --git a/dcae-analytics-cdap-common/src/test/resources/data/json/tca_policy.json b/dcae-analytics-cdap-common/src/test/resources/data/json/tca_policy.json deleted file mode 100644 index 1bf9e83..0000000 --- a/dcae-analytics-cdap-common/src/test/resources/data/json/tca_policy.json +++ /dev/null @@ -1,53 +0,0 @@ -{ - "domain": "measurementsForVfScaling", - "metricsPerFunctionalRole": [ - { - "functionalRole": "vFirewall", - "policyScope": "resource=vFirewall;type=configuration", - "policyName": "configuration.dcae.microservice.tca.xml", - "policyVersion": "v0.0.1", - "thresholds": [ - { - "closedLoopControlName": "CL-FRWL-LOW-TRAFFIC-SIG-d925ed73-8231-4d02-9545-db4e101f88f8", - "version": "1.0.2", - "fieldPath": "$.event.measurementsForVfScalingFields.vNicUsageArray[*].bytesIn", - "thresholdValue": 4000, - "direction": "LESS_OR_EQUAL", - "severity": "MAJOR" - }, - { - "closedLoopControlName": "CL-FRWL-HIGH-TRAFFIC-SIG-EA36FE84-9342-5E13-A656-EC5F21309A09", - "version": "1.0.2", - "fieldPath": "$.event.measurementsForVfScalingFields.vNicUsageArray[*].bytesIn", - "thresholdValue": 20000, - "direction": "GREATER_OR_EQUAL", - "severity": "CRITICAL" - } - ] - }, - { - "functionalRole": "vLoadBalancer", - "policyScope": "resource=vLoadBalancer;type=configuration", - "policyName": "configuration.dcae.microservice.tca.xml", - "policyVersion": "v0.0.1", - "thresholds": [ - { - "closedLoopControlName": "CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A", - "version": "1.0.2", - "fieldPath": "$.event.measurementsForVfScalingFields.vNicUsageArray[*].packetsIn", - "thresholdValue": 500, - "direction": "LESS_OR_EQUAL", - "severity": "MAJOR" - }, - { - "closedLoopControlName": "CL-LBAL-LOW-TRAFFIC-SIG-0C5920A6-B564-8035-C878-0E814352BC2B", - "version": "1.0.2", - "fieldPath": "$.event.measurementsForVfScalingFields.vNicUsageArray[*].packetsIn", - "thresholdValue": 5000, - "direction": "GREATER_OR_EQUAL", - "severity": "CRITICAL" - } - ] - } - ] -} |