diff options
Diffstat (limited to 'dcae-analytics-tca/src')
2 files changed, 212 insertions, 212 deletions
diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/persistance/TCAMessageStatusPersister.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/persistance/TCAMessageStatusPersister.java index 1ac8bfa..d881e5f 100644 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/persistance/TCAMessageStatusPersister.java +++ b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/persistance/TCAMessageStatusPersister.java @@ -55,189 +55,189 @@ import static org.openecomp.dcae.apod.analytics.common.CDAPComponentsConstants.T import static org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME; /** - * - * * @author Rajiv Singla. Creation Date: 11/15/2016. */ public abstract class TCAMessageStatusPersister { - private static final Logger LOG = LoggerFactory.getLogger(TCAMessageStatusPersister.class); - - /** - * Saves Message Status in Table. Assumes no alert was generated - * - * @param processorContext processor Context - * @param flowletContext Flowlet Context - * @param calculatorMessageType Calculation Message Type - * @param messageStatusTable Message Status Table - */ - public static void persist(final TCACEFProcessorContext processorContext, - final FlowletContext flowletContext, - final TCACalculatorMessageType calculatorMessageType, - final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable) { - persist(processorContext, flowletContext, calculatorMessageType, messageStatusTable, null); + private static final Logger LOG = LoggerFactory.getLogger(TCAMessageStatusPersister.class); + + /** + * Saves Message Status in Table. Assumes no alert was generated + * + * @param processorContext processor Context + * @param flowletContext Flowlet Context + * @param calculatorMessageType Calculation Message Type + * @param messageStatusTable Message Status Table + */ + public static void persist(final TCACEFProcessorContext processorContext, + final FlowletContext flowletContext, + final TCACalculatorMessageType calculatorMessageType, + final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable) { + persist(processorContext, flowletContext, calculatorMessageType, messageStatusTable, null); + } + + /** + * Saves Message Status in Table. Sets up alert message aslo + * + * @param processorContext processor Context + * @param flowletContext Flowlet Context + * @param calculatorMessageType Calculation Message Type + * @param messageStatusTable Message Status Table + * @param alertMessage Alert message + */ + public static void persist(final TCACEFProcessorContext processorContext, + final FlowletContext flowletContext, + final TCACalculatorMessageType calculatorMessageType, + final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable, + @Nullable final String alertMessage) { + + final String rowKey = createKey(calculatorMessageType); + + final Long currentTS = new Date().getTime(); + final int flowletInstanceId = flowletContext.getInstanceId(); + final String vesMessage = StringEscapeUtils.unescapeJson(processorContext.getMessage()); + + // Find Functional Role and domain + final Pair<String, String> domainAndFunctionalRole = TCAUtils + .getDomainAndFunctionalRole(processorContext); + final String domain = domainAndFunctionalRole.getLeft(); + final String functionalRole = domainAndFunctionalRole.getRight(); + + final TCAMessageStatusEntity tcaMessageStatusEntity = new TCAMessageStatusEntity(currentTS, + flowletInstanceId, calculatorMessageType.name(), vesMessage, domain, functionalRole); + + // add threshold violation fields + addViolatedThreshold(tcaMessageStatusEntity, processorContext); + // add processor status and messages + addMessageProcessorMessages(tcaMessageStatusEntity, processorContext); + // add Alert message + tcaMessageStatusEntity.setAlertMessage( + alertMessage == null ? null : StringEscapeUtils.unescapeJson(alertMessage) + ); + + messageStatusTable.write(rowKey, tcaMessageStatusEntity); + + LOG.debug("Finished persisting VES Status Message with rowKey: {} in Message Status Table.", + rowKey); + + } + + + /** + * Create TCA VES Message Status Table Properties + * + * @param timeToLiveSeconds Message Status Table time to live in seconds + * @return Message Status table properties + */ + public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) { + + try { + return ObjectMappedTableProperties.builder() + .setType(TCAMessageStatusEntity.class) + .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME) + .setRowKeyExploreType(Schema.Type.STRING) + .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds) + .setDescription(TCA_FIXED_VES_MESSAGE_STATUS_DESCRIPTION_TABLE) + .build(); + } catch (UnsupportedTypeException e) { + final String errorMessage = "Unable to convert TCAMessageStatusEntity class to Schema"; + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, new + IllegalArgumentException(errorMessage, e)); } - /** - * Saves Message Status in Table. Sets up alert message aslo - * - * @param processorContext processor Context - * @param flowletContext Flowlet Context - * @param calculatorMessageType Calculation Message Type - * @param messageStatusTable Message Status Table - * @param alertMessage Alert message - */ - public static void persist(final TCACEFProcessorContext processorContext, - final FlowletContext flowletContext, - final TCACalculatorMessageType calculatorMessageType, - final ObjectMappedTable<TCAMessageStatusEntity> messageStatusTable, - @Nullable final String alertMessage) { - - final String rowKey = createKey(calculatorMessageType); - - final Long currentTS = new Date().getTime(); - final int flowletInstanceId = flowletContext.getInstanceId(); - final String vesMessage = StringEscapeUtils.unescapeJson(processorContext.getMessage()); - - // Find Functional Role and domain - final Pair<String, String> domainAndFunctionalRole = TCAUtils.getDomainAndFunctionalRole(processorContext); - final String domain = domainAndFunctionalRole.getLeft(); - final String functionalRole = domainAndFunctionalRole.getRight(); - - final TCAMessageStatusEntity tcaMessageStatusEntity = new TCAMessageStatusEntity(currentTS, - flowletInstanceId, calculatorMessageType.name(), vesMessage, domain, functionalRole); - - // add threshold violation fields - addViolatedThreshold(tcaMessageStatusEntity, processorContext); - // add processor status and messages - addMessageProcessorMessages(tcaMessageStatusEntity, processorContext); - // add Alert message - tcaMessageStatusEntity.setAlertMessage( - alertMessage == null ? null : StringEscapeUtils.unescapeJson(alertMessage) - ); - - messageStatusTable.write(rowKey, tcaMessageStatusEntity); - - - LOG.debug("Finished persisting VES Status Message with rowKey: {} in Message Status Table.", rowKey); + } - } + /** + * Adds Violated Threshold Parameter values to {@link TCAMessageStatusEntity} + * + * @param tcaMessageStatusEntity message entity that needs to be populated with threshold fields + * @param processorContext processor context + * @return entity with populated threshold field values if present + */ + private static TCAMessageStatusEntity addViolatedThreshold( + final TCAMessageStatusEntity tcaMessageStatusEntity, + final TCACEFProcessorContext processorContext) { - /** - * Create TCA VES Message Status Table Properties - * - * @param timeToLiveSeconds Message Status Table time to live in seconds - * - * @return Message Status table properties - */ - public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) { - - try { - return ObjectMappedTableProperties.builder() - .setType(TCAMessageStatusEntity.class) - .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME) - .setRowKeyExploreType(Schema.Type.STRING) - .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds) - .setDescription(TCA_FIXED_VES_MESSAGE_STATUS_DESCRIPTION_TABLE) - .build(); - } catch (UnsupportedTypeException e) { - final String errorMessage = "Unable to convert TCAMessageStatusEntity class to Schema"; - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } + final MetricsPerFunctionalRole metricsPerFunctionalRole = processorContext + .getMetricsPerFunctionalRole(); - } + if (metricsPerFunctionalRole != null + && metricsPerFunctionalRole.getThresholds() != null + && metricsPerFunctionalRole.getThresholds().get(0) != null) { + final Threshold threshold = metricsPerFunctionalRole.getThresholds().get(0); + tcaMessageStatusEntity.setThresholdPath(threshold.getFieldPath()); + tcaMessageStatusEntity.setThresholdSeverity(threshold.getSeverity().name()); + tcaMessageStatusEntity.setThresholdDirection(threshold.getDirection().name()); + tcaMessageStatusEntity.setThresholdValue(threshold.getThresholdValue()); + } - /** - * Adds Violated Threshold Parameter values to {@link TCAMessageStatusEntity} - * - * @param tcaMessageStatusEntity message entity that needs to be populated with threshold fields - * @param processorContext processor context - * - * @return entity with populated threshold field values if present - */ - private static TCAMessageStatusEntity addViolatedThreshold(final TCAMessageStatusEntity tcaMessageStatusEntity, - final TCACEFProcessorContext processorContext) { - - final MetricsPerFunctionalRole metricsPerFunctionalRole = processorContext.getMetricsPerFunctionalRole(); - - if (metricsPerFunctionalRole != null - && metricsPerFunctionalRole.getThresholds() != null - && metricsPerFunctionalRole.getThresholds().get(0) != null) { - - final Threshold threshold = metricsPerFunctionalRole.getThresholds().get(0); - tcaMessageStatusEntity.setThresholdPath(threshold.getFieldPath()); - tcaMessageStatusEntity.setThresholdSeverity(threshold.getSeverity().name()); - tcaMessageStatusEntity.setThresholdDirection(threshold.getDirection().name()); - tcaMessageStatusEntity.setThresholdValue(threshold.getThresholdValue()); + return tcaMessageStatusEntity; + } + + + /** + * Add TCA CEF Message Processor status information + * + * @param tcaMessageStatusEntity message entity that needs to be populated with message processor + * fields + * @param processorContext processor context + * @return entity with populated message process status information + */ + private static TCAMessageStatusEntity addMessageProcessorMessages( + final TCAMessageStatusEntity tcaMessageStatusEntity, + final TCACEFProcessorContext processorContext) { + final List<? super MessageProcessor<? extends ProcessorContext>> messageProcessors = processorContext + .getMessageProcessors(); + + if (messageProcessors != null && !messageProcessors.isEmpty()) { + for (Object messageProcessor : messageProcessors) { + final MessageProcessor<TCACEFProcessorContext> tcaMessageProcessor = + (MessageProcessor<TCACEFProcessorContext>) messageProcessor; + + final String processingState = tcaMessageProcessor.getProcessingState().name(); + final String processingMessage = tcaMessageProcessor.getProcessingMessage().orNull(); + + if (messageProcessor.getClass().equals(TCACEFJsonProcessor.class)) { + tcaMessageStatusEntity.setJsonProcessorStatus(processingState); + tcaMessageStatusEntity.setJsonProcessorMessage(processingMessage); } - return tcaMessageStatusEntity; - } + if (messageProcessor.getClass().equals(TCACEFPolicyDomainFilter.class)) { + tcaMessageStatusEntity.setDomainFilterStatus(processingState); + tcaMessageStatusEntity.setDomainFilterMessage(processingMessage); + } + if (messageProcessor.getClass().equals(TCACEFPolicyFunctionalRoleFilter.class)) { + tcaMessageStatusEntity.setFunctionalRoleFilterStatus(processingState); + tcaMessageStatusEntity.setFunctionalRoleFilterMessage(processingMessage); + } - /** - * Add TCA CEF Message Processor status information - * - * @param tcaMessageStatusEntity message entity that needs to be populated with message processor fields - * @param processorContext processor context - * - * @return entity with populated message process status information - */ - private static TCAMessageStatusEntity addMessageProcessorMessages( - final TCAMessageStatusEntity tcaMessageStatusEntity, final TCACEFProcessorContext processorContext) { - final List<? super MessageProcessor<? extends ProcessorContext>> messageProcessors = processorContext - .getMessageProcessors(); - - if (messageProcessors != null && !messageProcessors.isEmpty()) { - for (Object messageProcessor : messageProcessors) { - final MessageProcessor<TCACEFProcessorContext> tcaMessageProcessor = - (MessageProcessor<TCACEFProcessorContext>) messageProcessor; - - final String processingState = tcaMessageProcessor.getProcessingState().name(); - final String processingMessage = tcaMessageProcessor.getProcessingMessage().orNull(); - - if (messageProcessor.getClass().equals(TCACEFJsonProcessor.class)) { - tcaMessageStatusEntity.setJsonProcessorStatus(processingState); - tcaMessageStatusEntity.setJsonProcessorMessage(processingMessage); - } - - if (messageProcessor.getClass().equals(TCACEFPolicyDomainFilter.class)) { - tcaMessageStatusEntity.setDomainFilterStatus(processingState); - tcaMessageStatusEntity.setDomainFilterMessage(processingMessage); - } - - if (messageProcessor.getClass().equals(TCACEFPolicyFunctionalRoleFilter.class)) { - tcaMessageStatusEntity.setFunctionalRoleFilterStatus(processingState); - tcaMessageStatusEntity.setFunctionalRoleFilterMessage(processingMessage); - } - - if (messageProcessor.getClass().equals(TCACEFPolicyThresholdsProcessor.class)) { - tcaMessageStatusEntity.setThresholdCalculatorStatus(processingState); - tcaMessageStatusEntity.setThresholdCalculatorMessage(processingMessage); - } - - } + if (messageProcessor.getClass().equals(TCACEFPolicyThresholdsProcessor.class)) { + tcaMessageStatusEntity.setThresholdCalculatorStatus(processingState); + tcaMessageStatusEntity.setThresholdCalculatorMessage(processingMessage); } - return tcaMessageStatusEntity; - } - /** - * Creates Row Key for TCA VES Message Status table - * - * Row Key = (Message Type + Decreasing Value) - * - * @param calculatorMessageType calculator message type - * - * @return row key string - */ - private static String createKey(final TCACalculatorMessageType calculatorMessageType) { - - final List<String> keyList = new LinkedList<>(); - keyList.add(calculatorMessageType.name()); - keyList.add(PersistenceUtils.getCurrentTimeReverseSubKey()); - return Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER).join(keyList); + } } + return tcaMessageStatusEntity; + } + + /** + * Creates Row Key for TCA VES Message Status table + * + * Row Key = (Message Type + Decreasing Value) + * + * @param calculatorMessageType calculator message type + * @return row key string + */ + private static String createKey(final TCACalculatorMessageType calculatorMessageType) { + + final List<String> keyList = new LinkedList<>(); + keyList.add(calculatorMessageType.name()); + keyList.add(PersistenceUtils.getCurrentTimeReverseSubKey()); + return Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER).join(keyList); + } } diff --git a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/persistance/TCAVESAlertsPersister.java b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/persistance/TCAVESAlertsPersister.java index 83eaf7f..b9f636d 100644 --- a/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/persistance/TCAVESAlertsPersister.java +++ b/dcae-analytics-tca/src/main/java/org/openecomp/dcae/apod/analytics/tca/persistance/TCAVESAlertsPersister.java @@ -37,62 +37,62 @@ import static org.openecomp.dcae.apod.analytics.common.CDAPComponentsConstants.T import static org.openecomp.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME; /** - * * @author Rajiv Singla. Creation Date: 11/16/2016. */ public abstract class TCAVESAlertsPersister { - private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsPersister.class); - - /** - * Persists Alert Message to Alerts Table - * - * @param alertMessage alert Message - * @param tcaVESAlertTable alert Table Name - */ - public static void persist(final String alertMessage, final ObjectMappedTable<TCAVESAlertEntity> tcaVESAlertTable) { - final Date currentDate = new Date(); - final TCAVESAlertEntity alertEntity = new TCAVESAlertEntity(currentDate.getTime(), - StringEscapeUtils.unescapeJson(alertMessage)); - // row key is same as current timestamp - final String rowKey = createRowKey(currentDate); - tcaVESAlertTable.write(rowKey, alertEntity); + private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsPersister.class); - LOG.debug("Finished persisting VES Alert message ID: {} in VES Alerts table.", rowKey); - } + /** + * Persists Alert Message to Alerts Table + * + * @param alertMessage alert Message + * @param tcaVESAlertTable alert Table Name + */ + public static void persist(final String alertMessage, + final ObjectMappedTable<TCAVESAlertEntity> tcaVESAlertTable) { + final Date currentDate = new Date(); + final TCAVESAlertEntity alertEntity = new TCAVESAlertEntity(currentDate.getTime(), + StringEscapeUtils.unescapeJson(alertMessage)); + // row key is same as current timestamp + final String rowKey = createRowKey(currentDate); + tcaVESAlertTable.write(rowKey, alertEntity); + LOG.debug("Finished persisting VES Alert message ID: {} in VES Alerts table.", rowKey); + } - /** - * Creates {@link DatasetProperties} for Alerts Table - * - * @param timeToLiveSeconds alerts table Time to Live - * @return Alerts table properties - */ - public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) { - try { - return ObjectMappedTableProperties.builder() - .setType(TCAVESAlertEntity.class) - .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME) - .setRowKeyExploreType(Schema.Type.STRING) - .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds) - .setDescription(TCA_DEFAULT_VES_ALERTS_DESCRIPTION_TABLE) - .build(); - } catch (UnsupportedTypeException e) { - final String errorMessage = "Unable to convert TCAVESAlertEntity class to Schema"; - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, new IllegalArgumentException(errorMessage)); - } + /** + * Creates {@link DatasetProperties} for Alerts Table + * + * @param timeToLiveSeconds alerts table Time to Live + * @return Alerts table properties + */ + public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) { + try { + return ObjectMappedTableProperties.builder() + .setType(TCAVESAlertEntity.class) + .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME) + .setRowKeyExploreType(Schema.Type.STRING) + .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds) + .setDescription(TCA_DEFAULT_VES_ALERTS_DESCRIPTION_TABLE) + .build(); + } catch (UnsupportedTypeException e) { + final String errorMessage = "Unable to convert TCAVESAlertEntity class to Schema"; + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, new + IllegalArgumentException(errorMessage, e)); } - /** - * Creates Row Key for Alerts Table - * - * @param date current Date - * - * @return row key - */ - public static String createRowKey(final Date date) { - return String.format("%025d", date.getTime()); - } + } + + /** + * Creates Row Key for Alerts Table + * + * @param date current Date + * @return row key + */ + public static String createRowKey(final Date date) { + return String.format("%025d", date.getTime()); + } } |