diff options
Diffstat (limited to 'dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca')
7 files changed, 1298 insertions, 0 deletions
diff --git a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementEntity.java b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementEntity.java new file mode 100644 index 0000000..6c6aaea --- /dev/null +++ b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementEntity.java @@ -0,0 +1,157 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.common.persistance.tca; + +import com.google.common.base.Objects; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +/** + * TCA Alerts Abatement Entity is used to persist information to determine if abatement event need to sent to downstream + * systems + * + * @author Rajiv Singla . Creation Date: 9/11/2017. + */ +public class TCAAlertsAbatementEntity implements Writable, Serializable { + + private static final long serialVersionUID = 1L; + + private long creationTS; + private String requestId; + // Kept as string to avoid null checks + private String abatementSentTS; + + /** + * No Arg constructor required for Jackson Json Serialization / Deserialization + */ + public TCAAlertsAbatementEntity() { + // required no arg constructor + } + + /** + * Creates TCA Alerts Abatement Entity to persist information to determine if abatement alerts need to be posted + * + * @param creationTS record creation time + * @param requestId request ID of generated alert + * @param abatementSentTS time when abatement was sent out for that alert if any + */ + public TCAAlertsAbatementEntity(long creationTS, String requestId, String abatementSentTS) { + this.creationTS = creationTS; + this.requestId = requestId; + this.abatementSentTS = abatementSentTS; + } + + /** + * Timestamp when record was created + * + * @return timestamp when record was created + */ + public long getCreationTS() { + return creationTS; + } + + /** + * Set value for timestamp when record was created + * + * @param creationTS new value for timestamp when record was created + */ + public void setCreationTS(long creationTS) { + this.creationTS = creationTS; + } + + /** + * Request Id of ONSET alert which was sent + * + * @return request Id of ONSET alert which was sent + */ + public String getRequestId() { + return requestId; + } + + /** + * Set Request Id of ONSET alert + * + * @param requestId set new value for ONSET alert request id + */ + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + + /** + * Get abatement Sent Timestamp + * + * @return get abatement alert sent timestamp + */ + public String getAbatementSentTS() { + return abatementSentTS; + } + + /** + * Set timestamp when abatement alert is sent + * + * @param abatementSentTS sent new value for timestamp when abatement alert is sent + */ + public void setAbatementSentTS(String abatementSentTS) { + this.abatementSentTS = abatementSentTS; + } + + /** + * Write entity to Table + * + * @param dataOutput data output + * @throws IOException io exception + */ + @Override + public void write(DataOutput dataOutput) throws IOException { + WritableUtils.writeVLong(dataOutput, creationTS); + WritableUtils.writeString(dataOutput, requestId); + WritableUtils.writeString(dataOutput, abatementSentTS); + } + + /** + * Read entity from table + * + * @param dataInput data input + * @throws IOException io exception + */ + @Override + public void readFields(DataInput dataInput) throws IOException { + creationTS = WritableUtils.readVLong(dataInput); + requestId = WritableUtils.readString(dataInput); + abatementSentTS = WritableUtils.readString(dataInput); + } + + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("creationTS", creationTS) + .add("requestId", requestId) + .add("abatementSentTS", abatementSentTS) + .toString(); + } +} diff --git a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java new file mode 100644 index 0000000..19cf9c7 --- /dev/null +++ b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java @@ -0,0 +1,127 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.common.persistance.tca; + +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.data.schema.UnsupportedTypeException; +import co.cask.cdap.api.dataset.DatasetProperties; +import co.cask.cdap.api.dataset.lib.IndexedTable; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.common.utils.PersistenceUtils; +import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.List; + +import static org.onap.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME; + +/** + * Utility methods to persist TCA Alerts Abatement information + * + * @author Rajiv Singla . Creation Date: 9/11/2017. + */ +public abstract class TCAAlertsAbatementPersister { + + private static final Logger LOG = LoggerFactory.getLogger(TCAAlertsAbatementPersister.class); + + private static final Joiner KEY_JOINER = Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER); + + private TCAAlertsAbatementPersister() { + // private constructor + } + + /** + * Creates {@link DatasetProperties} for Alerts Table + * + * @param timeToLiveSeconds alerts table Time to Live + * + * @return Alerts Abatement table properties + */ + public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) { + try { + return ObjectMappedTableProperties.builder() + .setType(TCAAlertsAbatementEntity.class) + .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME) + .setRowKeyExploreType(Schema.Type.STRING) + .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds) + .setDescription(CDAPComponentsConstants.TCA_FIXED_ALERTS_ABATEMENT_DESCRIPTION_TABLE) + .build(); + } catch (UnsupportedTypeException e) { + final String errorMessage = "Unable to convert TCAAlertsAbatementEntity class to Schema"; + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + + public static void persist(final EventListener eventListener, + final MetricsPerEventName violatedMetricsPerEventName, + final TCAVESResponse tcavesResponse, + final String abatementTS, + final ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable) { + final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName); + + final long currentTimestamp = new Date().getTime(); + final String requestID = tcavesResponse.getRequestID(); + final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = new TCAAlertsAbatementEntity(currentTimestamp, + requestID, abatementTS); + tcaAlertsAbatementTable.write(abatementTableKey, tcaAlertsAbatementEntity); + + LOG.debug("Persisted AlertsAbatementEntity: {} with Key: {}", tcaAlertsAbatementEntity, abatementTableKey); + + } + + public static TCAAlertsAbatementEntity lookUpByKey(final EventListener eventListener, + final MetricsPerEventName violatedMetricsPerEventName, + final ObjectMappedTable<TCAAlertsAbatementEntity> + tcaAlertsAbatementTable) { + final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName); + return tcaAlertsAbatementTable.read(abatementTableKey); + } + + + public static String createKey(final EventListener eventListener, + final MetricsPerEventName violatedMetricsPerEventName) { + // no null check required as all are required fields + final String eventName = violatedMetricsPerEventName.getEventName(); + final String sourceName = eventListener.getEvent().getCommonEventHeader().getSourceName(); + final String reportingEntityName = eventListener.getEvent().getCommonEventHeader().getReportingEntityName(); + // violated threshold will always be present + final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0); + final String closedLoopControlName = violatedThreshold.getClosedLoopControlName(); + final String fieldPath = violatedThreshold.getFieldPath(); + + final List<String> abatementKeyList = + ImmutableList.of(eventName, sourceName, reportingEntityName, closedLoopControlName, fieldPath); + + return KEY_JOINER.join(abatementKeyList); + } + +} diff --git a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCACalculatorMessageType.java b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCACalculatorMessageType.java new file mode 100644 index 0000000..4717a90 --- /dev/null +++ b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCACalculatorMessageType.java @@ -0,0 +1,43 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.common.persistance.tca; + +/** + * TCA Calculator applies TCA Policy to incoming VES messages and classifies them as per this enum + * + * @author Rajiv Singla . Creation Date: 11/15/2016. + */ +public enum TCACalculatorMessageType { + + /** + * VES messages that are not applicable as per TCA Policy + */ + INAPPLICABLE, + /** + * VES messages that are applicable as per TCA Policy but don't violate any thresholds + */ + COMPLIANT, + /** + * VES messages that are applicable as per TCA Policy and also in violation of TCA Policy thresholds + */ + NON_COMPLIANT; + +} diff --git a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusEntity.java b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusEntity.java new file mode 100644 index 0000000..b3edc64 --- /dev/null +++ b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusEntity.java @@ -0,0 +1,550 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.common.persistance.tca; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Direction; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +/** + * TCA Message Status is an Entity which is used to persist TCA VES Message status information in Message Status Table + * + * @author Rajiv Singla . Creation Date: 11/16/2016. + */ +public class TCAMessageStatusEntity implements Writable, Serializable { + + private static final long serialVersionUID = 1L; + + private long creationTS; + private int instanceId; + private String messageType; + private String vesMessage; + private String domain; + private String eventName; + private String thresholdPath; + private String thresholdSeverity; + private String thresholdDirection; + private Long thresholdValue; + private String jsonProcessorStatus; + private String jsonProcessorMessage; + private String domainFilterStatus; + private String domainFilterMessage; + private String eventNameFilterStatus; + private String eventNameFilterMessage; + private String thresholdCalculatorStatus; + private String thresholdCalculatorMessage; + private String alertMessage; + + /** + * No Arg constructor required for Jackson Json Serialization / Deserialization + */ + public TCAMessageStatusEntity() { + // no argument constructor required for json serialization / deserialization + } + + /** + * Create new Instance of {@link TCAMessageStatusEntity} + * + * @param creationTS creation Timestamp + * @param instanceId CDAP flowlet instance ID + * @param messageType {@link TCACalculatorMessageType} + * @param vesMessage incoming VES message from collector + * @param domain VES message domain if present + * @param eventName VES message functional role if present + */ + public TCAMessageStatusEntity(final long creationTS, final int instanceId, final String messageType, + final String vesMessage, final String domain, final String eventName) { + this(creationTS, instanceId, messageType, vesMessage, domain, eventName, null, null, null, null, + null, null, null, null, null, null, null, null, null); + } + + + /** + * Create new Instance of {@link TCAMessageStatusEntity} + * + * @param creationTS creation Timestamp + * @param instanceId CDAP flowlet instance ID + * @param messageType {@link TCACalculatorMessageType} + * @param vesMessage incoming VES message from collector + * @param domain VES message domain if present + * @param eventName VES message event name if present + * @param thresholdPath Violated threshold path + * @param thresholdSeverity Violated threshold Severity if any + * @param thresholdDirection Violated threshold Direction if any + * @param thresholdValue Violated threshold value if any + * @param jsonProcessorStatus {@link TCACEFJsonProcessor} status + * @param jsonProcessorMessage {@link TCACEFJsonProcessor} message + * @param domainFilterStatus {@link TCACEFPolicyDomainFilter} status + * @param domainFilterMessage {@link TCACEFPolicyDomainFilter} message + * @param eventNameFilterStatus {@link TCACEFPolicyEventNameFilter} status + * @param eventNameFilterMessage {@link TCACEFPolicyEventNameFilter} message + * @param thresholdCalculatorStatus {@link TCACEFPolicyThresholdsProcessor} status + * @param thresholdCalculatorMessage {@link TCACEFPolicyThresholdsProcessor} message + * @param alertMessage alert message that will be sent out in case of threshold violation + */ + public TCAMessageStatusEntity(long creationTS, int instanceId, String messageType, String vesMessage, + String domain, String eventName, + String thresholdPath, String thresholdSeverity, String thresholdDirection, + Long thresholdValue, + String jsonProcessorStatus, String jsonProcessorMessage, + String domainFilterStatus, String domainFilterMessage, + String eventNameFilterStatus, String eventNameFilterMessage, + String thresholdCalculatorStatus, String thresholdCalculatorMessage, + String alertMessage) { + this.creationTS = creationTS; + this.instanceId = instanceId; + this.messageType = messageType; + this.vesMessage = vesMessage; + this.domain = domain; + this.eventName = eventName; + this.thresholdPath = thresholdPath; + this.thresholdSeverity = thresholdSeverity; + this.thresholdDirection = thresholdDirection; + this.thresholdValue = thresholdValue; + this.jsonProcessorStatus = jsonProcessorStatus; + this.jsonProcessorMessage = jsonProcessorMessage; + this.domainFilterStatus = domainFilterStatus; + this.domainFilterMessage = domainFilterMessage; + this.eventNameFilterStatus = eventNameFilterStatus; + this.eventNameFilterMessage = eventNameFilterMessage; + this.thresholdCalculatorStatus = thresholdCalculatorStatus; + this.thresholdCalculatorMessage = thresholdCalculatorMessage; + this.alertMessage = alertMessage; + } + + /** + * Provides Creation Timestamp + * + * @return creation timestamp long value + */ + public long getCreationTS() { + return creationTS; + } + + /** + * Sets Creations Timestamp + * + * @param creationTS creation timestamp long value + */ + public void setCreationTS(long creationTS) { + this.creationTS = creationTS; + } + + + /** + * Provides CDAP Flowlet instance ID + * + * @return cdap flowlet instance ID + */ + public int getInstanceId() { + return instanceId; + } + + /** + * Sets CDAP Flowlet instance ID + * + * @param instanceId flowlet instance ID + */ + public void setInstanceId(int instanceId) { + this.instanceId = instanceId; + } + + /** + * Provides Message Calculator Type {@link TCACalculatorMessageType} + * + * @return calculator message type + */ + public String getMessageType() { + return messageType; + } + + /** + * Sets Calculator message Type {@link TCACalculatorMessageType} + * + * @param messageType calculator message type + */ + public void setMessageType(String messageType) { + this.messageType = messageType; + } + + /** + * Provides incoming VES Message + * + * @return ves message + */ + public String getVesMessage() { + return vesMessage; + } + + /** + * Set new value for VES message + * + * @param vesMessage ves message + */ + public void setVesMessage(String vesMessage) { + this.vesMessage = vesMessage; + } + + /** + * Provides VES message Domain + * + * @return ves message domain + */ + public String getDomain() { + return domain; + } + + /** + * Sets VES Message Domain + * + * @param domain ves message domain + */ + public void setDomain(String domain) { + this.domain = domain; + } + + /** + * Provides VES Message Event Name + * + * @return ves message Event Name + */ + public String getEventName() { + return eventName; + } + + /** + * Sets VES Message Functional Role + * + * @param eventName ves message Functional Role + */ + public void setEventName(String eventName) { + this.eventName = eventName; + } + + /** + * Violated Threshold Path as extracted from {@link TCAPolicy} + * + * @return violated threshold path + */ + public String getThresholdPath() { + return thresholdPath; + } + + /** + * Sets value for Violated Threshold Path + * + * @param thresholdPath violated threshold path + */ + public void setThresholdPath(String thresholdPath) { + this.thresholdPath = thresholdPath; + } + + /** + * Violated threshold Event Severity + * + * @return event severity + */ + public String getThresholdSeverity() { + return thresholdSeverity; + } + + /** + * Violated Threshold Severity + * + * @param thresholdSeverity violated threshold severity + */ + public void setThresholdSeverity(String thresholdSeverity) { + this.thresholdSeverity = thresholdSeverity; + } + + /** + * Violated Threshold {@link Direction} + * + * @return violated threshold Direction + */ + public String getThresholdDirection() { + return thresholdDirection; + } + + /** + * Sets Violated Threshold Direction + * + * @param thresholdDirection violated threshold direction + */ + public void setThresholdDirection(String thresholdDirection) { + this.thresholdDirection = thresholdDirection; + } + + /** + * Provides Violated Threshold Value + * + * @return violated Threshold value + */ + public Long getThresholdValue() { + return thresholdValue; + } + + /** + * Sets Violated Threshold Value + * + * @param thresholdValue violated threshold value + */ + public void setThresholdValue(Long thresholdValue) { + this.thresholdValue = thresholdValue; + } + + /** + * Provides {@link TCACEFJsonProcessor} status + * + * @return json processor status + */ + public String getJsonProcessorStatus() { + return jsonProcessorStatus; + } + + /** + * Sets Json Processor status + * + * @param jsonProcessorStatus json processor status + */ + public void setJsonProcessorStatus(String jsonProcessorStatus) { + this.jsonProcessorStatus = jsonProcessorStatus; + } + + /** + * Provides {@link TCACEFJsonProcessor} message + * + * @return json processor message + */ + public String getJsonProcessorMessage() { + return jsonProcessorMessage; + } + + /** + * Sets Json Processor Message + * + * @param jsonProcessorMessage json processor message + */ + public void setJsonProcessorMessage(String jsonProcessorMessage) { + this.jsonProcessorMessage = jsonProcessorMessage; + } + + /** + * Provides {@link TCACEFPolicyDomainFilter} status + * + * @return domain filter status + */ + public String getDomainFilterStatus() { + return domainFilterStatus; + } + + /** + * Sets Domain Filter status + * + * @param domainFilterStatus domain filter status + */ + public void setDomainFilterStatus(String domainFilterStatus) { + this.domainFilterStatus = domainFilterStatus; + } + + /** + * Provides {@link TCACEFPolicyDomainFilter} message + * + * @return domain filter message + */ + public String getDomainFilterMessage() { + return domainFilterMessage; + } + + /** + * Sets Domain filter message + * + * @param domainFilterMessage domain filter message + */ + public void setDomainFilterMessage(String domainFilterMessage) { + this.domainFilterMessage = domainFilterMessage; + } + + public String getEventNameFilterStatus() { + return eventNameFilterStatus; + } + + /** + * Provides {@link TCACEFPolicyEventNameFilter} status + * + * @param eventNameFilterStatus functional Role filter status + */ + public void setEventNameFilterStatus(String eventNameFilterStatus) { + this.eventNameFilterStatus = eventNameFilterStatus; + } + + /** + * Provides {@link TCACEFPolicyEventNameFilter} message + * + * @return functional role filter message + */ + public String getEventNameFilterMessage() { + return eventNameFilterMessage; + } + + /** + * Sets Functional Role filter message + * + * @param eventNameFilterMessage functional role filter message + */ + public void setEventNameFilterMessage(String eventNameFilterMessage) { + this.eventNameFilterMessage = eventNameFilterMessage; + } + + /** + * Provides {@link TCACEFPolicyThresholdsProcessor} status + * + * @return threshold processor status + */ + public String getThresholdCalculatorStatus() { + return thresholdCalculatorStatus; + } + + /** + * Sets threshold calculator status + * + * @param thresholdCalculatorStatus threshold calculator status + */ + public void setThresholdCalculatorStatus(String thresholdCalculatorStatus) { + this.thresholdCalculatorStatus = thresholdCalculatorStatus; + } + + /** + * Provides {@link TCACEFPolicyThresholdsProcessor} message + * + * @return threshold processor message + */ + public String getThresholdCalculatorMessage() { + return thresholdCalculatorMessage; + } + + /** + * Sets Threshold Calculator Processor Message + * + * @param thresholdCalculatorMessage threshold calculator message + */ + public void setThresholdCalculatorMessage(String thresholdCalculatorMessage) { + this.thresholdCalculatorMessage = thresholdCalculatorMessage; + } + + /** + * Provides generated alert message + * + * @return generated alert message + */ + public String getAlertMessage() { + return alertMessage; + } + + /** + * Sets alert message + * + * @param alertMessage alert message + */ + public void setAlertMessage(String alertMessage) { + this.alertMessage = alertMessage; + } + + /** + * Write entity to Table + * + * @param dataOutput data output + * + * @throws IOException io exception + */ + @Override + public void write(DataOutput dataOutput) throws IOException { + WritableUtils.writeVLong(dataOutput, creationTS); + WritableUtils.writeVInt(dataOutput, instanceId); + WritableUtils.writeString(dataOutput, messageType); + WritableUtils.writeString(dataOutput, vesMessage); + + WritableUtils.writeString(dataOutput, domain); + WritableUtils.writeString(dataOutput, eventName); + + WritableUtils.writeString(dataOutput, thresholdPath); + WritableUtils.writeString(dataOutput, thresholdSeverity); + WritableUtils.writeString(dataOutput, thresholdDirection); + WritableUtils.writeVLong(dataOutput, thresholdValue); + + WritableUtils.writeString(dataOutput, jsonProcessorStatus); + WritableUtils.writeString(dataOutput, jsonProcessorMessage); + WritableUtils.writeString(dataOutput, domainFilterStatus); + WritableUtils.writeString(dataOutput, domainFilterMessage); + WritableUtils.writeString(dataOutput, eventNameFilterStatus); + WritableUtils.writeString(dataOutput, eventNameFilterMessage); + WritableUtils.writeString(dataOutput, thresholdCalculatorStatus); + WritableUtils.writeString(dataOutput, thresholdCalculatorMessage); + + WritableUtils.writeString(dataOutput, alertMessage); + + } + + /** + * Read entity from table + * + * @param dataInput data input + * @throws IOException io exception + */ + @Override + public void readFields(DataInput dataInput) throws IOException { + creationTS = WritableUtils.readVLong(dataInput); + instanceId = WritableUtils.readVInt(dataInput); + messageType = WritableUtils.readString(dataInput); + vesMessage = WritableUtils.readString(dataInput); + + domain = WritableUtils.readString(dataInput); + eventName = WritableUtils.readString(dataInput); + + thresholdPath = WritableUtils.readString(dataInput); + thresholdSeverity = WritableUtils.readString(dataInput); + thresholdDirection = WritableUtils.readString(dataInput); + thresholdValue = WritableUtils.readVLong(dataInput); + + jsonProcessorStatus = WritableUtils.readString(dataInput); + jsonProcessorMessage = WritableUtils.readString(dataInput); + domainFilterStatus = WritableUtils.readString(dataInput); + domainFilterMessage = WritableUtils.readString(dataInput); + eventNameFilterStatus = WritableUtils.readString(dataInput); + eventNameFilterMessage = WritableUtils.readString(dataInput); + thresholdCalculatorStatus = WritableUtils.readString(dataInput); + thresholdCalculatorMessage = WritableUtils.readString(dataInput); + + alertMessage = WritableUtils.readString(dataInput); + + } + +} diff --git a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersister.java b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersister.java new file mode 100644 index 0000000..89d8d00 --- /dev/null +++ b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAMessageStatusPersister.java @@ -0,0 +1,241 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.common.persistance.tca; + +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.data.schema.UnsupportedTypeException; +import co.cask.cdap.api.dataset.DatasetProperties; +import co.cask.cdap.api.dataset.lib.IndexedTable; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties; +import com.google.common.base.Joiner; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.common.service.processor.MessageProcessor; +import org.onap.dcae.apod.analytics.common.service.processor.ProcessorContext; +import org.onap.dcae.apod.analytics.common.utils.PersistenceUtils; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; +import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor; +import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext; +import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.LinkedList; +import java.util.List; + +import javax.annotation.Nullable; + +import static org.onap.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME; + +/** + * + * + * @author Rajiv Singla . Creation Date: 11/15/2016. + */ +public abstract class TCAMessageStatusPersister { + + private static final Logger LOG = LoggerFactory.getLogger(TCAMessageStatusPersister.class); + + private TCAMessageStatusPersister() { + + } + + /** + * Saves Message Status in Table. Assumes no alert was generated + * + * @param processorContext processor Context + * @param instanceId Instance Id + * @param calculatorMessageType Calculation Message Type + * @param messageStatusTable Message Status Table + */ + public static void persist(final TCACEFProcessorContext processorContext, + final int instanceId, + final TCACalculatorMessageType calculatorMessageType, + final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable) { + persist(processorContext, instanceId, calculatorMessageType, messageStatusTable, null); + } + + /** + * Saves Message Status in Table. Sets up alert message aslo + * + * @param processorContext processor Context + * @param instanceId Instance Id + * @param calculatorMessageType Calculation Message Type + * @param messageStatusTable Message Status Table + * @param alertMessage Alert message + */ + public static void persist(final TCACEFProcessorContext processorContext, + final int instanceId, + final TCACalculatorMessageType calculatorMessageType, + final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable, + @Nullable final String alertMessage) { + + final String rowKey = createKey(calculatorMessageType); + + final Long currentTS = new Date().getTime(); + final String vesMessage = StringEscapeUtils.unescapeJson(processorContext.getMessage()); + + // Find Functional Role and domain + final Pair<String, String> domainAndEventName = TCAUtils.getDomainAndEventName(processorContext); + final String domain = domainAndEventName.getLeft(); + final String eventName = domainAndEventName.getRight(); + + final TCAMessageStatusEntity tcaMessageStatusEntity = new TCAMessageStatusEntity(currentTS, + instanceId, calculatorMessageType.name(), vesMessage, domain, eventName); + + // add threshold violation fields + addViolatedThreshold(tcaMessageStatusEntity, processorContext); + // add processor status and messages + addMessageProcessorMessages(tcaMessageStatusEntity, processorContext); + // add Alert message + tcaMessageStatusEntity.setAlertMessage( + alertMessage == null ? null : StringEscapeUtils.unescapeJson(alertMessage) + ); + + messageStatusTable.write(rowKey, tcaMessageStatusEntity); + + LOG.debug("Finished persisting VES Status Message with rowKey: {} in Message Status Table.", rowKey); + + } + + + /** + * Create TCA VES Message Status Table Properties + * + * @param timeToLiveSeconds Message Status Table time to live in seconds + * + * @return Message Status table properties + */ + public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) { + + try { + return ObjectMappedTableProperties.builder() + .setType(TCAMessageStatusEntity.class) + .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME) + .setRowKeyExploreType(Schema.Type.STRING) + .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds) + .setDescription(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_STATUS_DESCRIPTION_TABLE) + .build(); + } catch (UnsupportedTypeException e) { + final String errorMessage = "Unable to convert TCAMessageStatusEntity class to Schema"; + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + } + + + /** + * Adds Violated Threshold Parameter values to {@link TCAMessageStatusEntity} + * + * @param tcaMessageStatusEntity message entity that needs to be populated with threshold fields + * @param processorContext processor context + * + */ + private static void addViolatedThreshold(final TCAMessageStatusEntity tcaMessageStatusEntity, + final TCACEFProcessorContext processorContext) { + + final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName(); + + if (metricsPerEventName != null + && metricsPerEventName.getThresholds() != null + && metricsPerEventName.getThresholds().get(0) != null) { + + final Threshold threshold = metricsPerEventName.getThresholds().get(0); + tcaMessageStatusEntity.setThresholdPath(threshold.getFieldPath()); + tcaMessageStatusEntity.setThresholdSeverity(threshold.getSeverity().name()); + tcaMessageStatusEntity.setThresholdDirection(threshold.getDirection().name()); + tcaMessageStatusEntity.setThresholdValue(threshold.getThresholdValue()); + } + + } + + + /** + * Add TCA CEF Message Processor status information + * + * @param tcaMessageStatusEntity message entity that needs to be populated with message processor fields + * @param processorContext processor context + * + */ + @SuppressWarnings("unchecked") + private static void addMessageProcessorMessages( + final TCAMessageStatusEntity tcaMessageStatusEntity, final TCACEFProcessorContext processorContext) { + final List<? super MessageProcessor<? extends ProcessorContext>> messageProcessors = processorContext + .getMessageProcessors(); + + if (messageProcessors != null && !messageProcessors.isEmpty()) { + for (Object messageProcessor : messageProcessors) { + final MessageProcessor<TCACEFProcessorContext> tcaMessageProcessor = + (MessageProcessor<TCACEFProcessorContext>) messageProcessor; + + final String processingState = tcaMessageProcessor.getProcessingState().name(); + final String processingMessage = tcaMessageProcessor.getProcessingMessage().orNull(); + + if (messageProcessor.getClass().equals(TCACEFJsonProcessor.class)) { + tcaMessageStatusEntity.setJsonProcessorStatus(processingState); + tcaMessageStatusEntity.setJsonProcessorMessage(processingMessage); + } + + if (messageProcessor.getClass().equals(TCACEFPolicyDomainFilter.class)) { + tcaMessageStatusEntity.setDomainFilterStatus(processingState); + tcaMessageStatusEntity.setDomainFilterMessage(processingMessage); + } + + if (messageProcessor.getClass().equals(TCACEFPolicyEventNameFilter.class)) { + tcaMessageStatusEntity.setEventNameFilterStatus(processingState); + tcaMessageStatusEntity.setEventNameFilterMessage(processingMessage); + } + + if (messageProcessor.getClass().equals(TCACEFPolicyThresholdsProcessor.class)) { + tcaMessageStatusEntity.setThresholdCalculatorStatus(processingState); + tcaMessageStatusEntity.setThresholdCalculatorMessage(processingMessage); + } + + } + } + } + + /** + * Creates Row Key for TCA VES Message Status table + * + * Row Key = (Message Type + Decreasing Value) + * + * @param calculatorMessageType calculator message type + * + * @return row key string + */ + private static String createKey(final TCACalculatorMessageType calculatorMessageType) { + + final List<String> keyList = new LinkedList<>(); + keyList.add(calculatorMessageType.name()); + keyList.add(PersistenceUtils.getCurrentTimeReverseSubKey()); + return Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER).join(keyList); + } + +} diff --git a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAVESAlertEntity.java b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAVESAlertEntity.java new file mode 100644 index 0000000..77af19a --- /dev/null +++ b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAVESAlertEntity.java @@ -0,0 +1,78 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.common.persistance.tca; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +/** + * + * @author Rajiv Singla . Creation Date: 11/16/2016. + */ +public class TCAVESAlertEntity implements Writable, Serializable { + + private static final long serialVersionUID = 1L; + + private long creationTS; + private String alertMessage; + + public TCAVESAlertEntity() { + // no argument constructor required for json serialization / deserialization + } + + public TCAVESAlertEntity(long creationTS, String alertMessage) { + this.creationTS = creationTS; + this.alertMessage = alertMessage; + } + + public long getCreationTS() { + return creationTS; + } + + public void setCreationTS(long creationTS) { + this.creationTS = creationTS; + } + + public String getAlertMessage() { + return alertMessage; + } + + public void setAlertMessage(String alertMessage) { + this.alertMessage = alertMessage; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + WritableUtils.writeVLong(dataOutput, creationTS); + WritableUtils.writeString(dataOutput, alertMessage); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + creationTS = WritableUtils.readVLong(dataInput); + alertMessage = WritableUtils.readString(dataInput); + } +} diff --git a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAVESAlertsPersister.java b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAVESAlertsPersister.java new file mode 100644 index 0000000..252197c --- /dev/null +++ b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAVESAlertsPersister.java @@ -0,0 +1,102 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.cdap.common.persistance.tca; + +import co.cask.cdap.api.data.schema.Schema; +import co.cask.cdap.api.data.schema.UnsupportedTypeException; +import co.cask.cdap.api.dataset.DatasetProperties; +import co.cask.cdap.api.dataset.lib.IndexedTable; +import co.cask.cdap.api.dataset.lib.ObjectMappedTable; +import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties; +import org.apache.commons.lang3.StringEscapeUtils; +import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; + +import static org.onap.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME; + +/** + * + * @author Rajiv Singla . Creation Date: 11/16/2016. + */ +public abstract class TCAVESAlertsPersister { + + private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsPersister.class); + + private TCAVESAlertsPersister() { + + } + + /** + * Persists Alert Message to Alerts Table + * + * @param alertMessage alert Message + * @param tcaVESAlertTable alert Table Name + */ + public static void persist(final String alertMessage, final ObjectMappedTable<TCAVESAlertEntity> tcaVESAlertTable) { + final Date currentDate = new Date(); + final TCAVESAlertEntity alertEntity = new TCAVESAlertEntity(currentDate.getTime(), + StringEscapeUtils.unescapeJson(alertMessage)); + // row key is same as current timestamp + final String rowKey = createRowKey(currentDate); + tcaVESAlertTable.write(rowKey, alertEntity); + + LOG.debug("Finished persisting VES Alert message ID: {} in VES Alerts table.", rowKey); + } + + + /** + * Creates {@link DatasetProperties} for Alerts Table + * + * @param timeToLiveSeconds alerts table Time to Live + * @return Alerts table properties + */ + public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) { + try { + return ObjectMappedTableProperties.builder() + .setType(TCAVESAlertEntity.class) + .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME) + .setRowKeyExploreType(Schema.Type.STRING) + .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds) + .setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_DESCRIPTION_TABLE) + .build(); + } catch (UnsupportedTypeException e) { + final String errorMessage = "Unable to convert TCAVESAlertEntity class to Schema"; + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + + } + + /** + * Creates Row Key for Alerts Table + * + * @param date current Date + * + * @return row key + */ + public static String createRowKey(final Date date) { + return String.format("%025d", date.getTime()); + } + +} |