aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java')
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java49
1 files changed, 46 insertions, 3 deletions
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java
index 759c3d5..f240556 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java
@@ -33,6 +33,9 @@ import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOu
import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
+import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
@@ -41,9 +44,16 @@ import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.exceptions.JedisConnectionException;
import java.io.IOException;
import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
/**
* Flowlet responsible to sending out abatement alerts
@@ -57,6 +67,8 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
@Property
private final String tcaAlertsAbatementTableName;
+ private Set<HostAndPort> redisHostAndPorts = null;
+
@Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
protected OutputEmitter<String> alertsAbatementOutputEmitter;
@@ -76,6 +88,16 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
public void initialize(FlowletContext flowletContext) throws Exception {
super.initialize(flowletContext);
tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);
+ // Parse runtime arguments
+ final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext);
+ if(tcaAppPreferences.getEnableRedisCaching()) {
+ final String redisHosts = tcaAppPreferences.getRedisHosts();
+ LOG.info("Redis Distributed Caching is enabled for abated alerts with Redis Hosts: {}", redisHosts);
+ redisHostAndPorts = getRedisHostsAndPorts(redisHosts);
+ checkRedisConnection(redisHostAndPorts);
+ } else {
+ LOG.info("Redis Distributed caching is disabled for abated alerts");
+ }
}
@ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
@@ -106,7 +128,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);
TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
- null, tcaAlertsAbatementTable);
+ null, tcaAlertsAbatementTable, redisHostAndPorts);
LOG.debug("Emitting ONSET alert: {}", alertMessageString);
alertsAbatementOutputEmitter.emit(alertMessageString);
break;
@@ -116,7 +138,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);
final TCAAlertsAbatementEntity previousAlertsAbatementEntry =
TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,
- tcaAlertsAbatementTable);
+ tcaAlertsAbatementTable, redisHostAndPorts);
if (previousAlertsAbatementEntry != null) {
@@ -136,7 +158,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
// save new Abatement alert sent timestamp in table
TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
- Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);
+ Long.toString(newAbatementSentTS), tcaAlertsAbatementTable, redisHostAndPorts);
// Set request id to be same as previous ONSET event request ID
tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId());
@@ -166,4 +188,25 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
}
+ private static Set<HostAndPort> getRedisHostsAndPorts(final String redisHosts) {
+ final LinkedHashSet<HostAndPort> hostAndPorts = new LinkedHashSet<>();
+ final String[] redisHostsString = redisHosts.split(",");
+ for (String redisHostString : redisHostsString) {
+ hostAndPorts.add(HostAndPort.parseString(redisHostString.trim()));
+ }
+ return hostAndPorts;
+ }
+
+ private static void checkRedisConnection(final Set<HostAndPort> redisHostAndPorts) {
+ LOG.info("Checking Redis Connection for Redis Hosts: {}", redisHostAndPorts);
+ try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) {
+ final Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
+ jedisCluster.get("testKey");
+ LOG.info("Confirmed redis cluster Nodes: {}", clusterNodes.keySet());
+ } catch (JedisConnectionException | IOException e) {
+ LOG.error("Unable to make Redis connection for given redisHosts: {}", redisHostAndPorts);
+ throw new DCAEAnalyticsRuntimeException("No Redis Connection", LOG, e);
+ }
+ }
+
}