diff options
author | Singla, Rajiv (rs153v) <rs153v@att.com> | 2018-03-20 13:41:33 -0400 |
---|---|---|
committer | Singla, Rajiv (rs153v) <rs153v@att.com> | 2018-03-20 13:41:53 -0400 |
commit | 28eaf8279e520aba8ab8b6db8aec151af0c58b5d (patch) | |
tree | 19d3c81928c2c2541dc386b99cd96c8bc585006c /dcae-analytics-cdap-tca/src/main/java/org | |
parent | 7904a8783b91aae406aa96949c9d28f9948110e9 (diff) |
Added Redis Support
Issue-ID: DCAEGEN2-406
Change-Id: I857cab2a82de86181dcc7558b47e656d596245cf
Signed-off-by: Singla, Rajiv (rs153v) <rs153v@att.com>
Diffstat (limited to 'dcae-analytics-cdap-tca/src/main/java/org')
3 files changed, 77 insertions, 3 deletions
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java index 759c3d5..f240556 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java @@ -33,6 +33,9 @@ import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOu import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException; import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity; import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister; +import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences; +import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus; import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; @@ -41,9 +44,16 @@ import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse; import org.onap.dcae.apod.analytics.tca.utils.TCAUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.exceptions.JedisConnectionException; import java.io.IOException; import java.util.Date; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; /** * Flowlet responsible to sending out abatement alerts @@ -57,6 +67,8 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet { @Property private final String tcaAlertsAbatementTableName; + private Set<HostAndPort> redisHostAndPorts = null; + @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT) protected OutputEmitter<String> alertsAbatementOutputEmitter; @@ -76,6 +88,16 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet { public void initialize(FlowletContext flowletContext) throws Exception { super.initialize(flowletContext); tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName); + // Parse runtime arguments + final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext); + if(tcaAppPreferences.getEnableRedisCaching()) { + final String redisHosts = tcaAppPreferences.getRedisHosts(); + LOG.info("Redis Distributed Caching is enabled for abated alerts with Redis Hosts: {}", redisHosts); + redisHostAndPorts = getRedisHostsAndPorts(redisHosts); + checkRedisConnection(redisHostAndPorts); + } else { + LOG.info("Redis Distributed caching is disabled for abated alerts"); + } } @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT) @@ -106,7 +128,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet { LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage); TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, - null, tcaAlertsAbatementTable); + null, tcaAlertsAbatementTable, redisHostAndPorts); LOG.debug("Emitting ONSET alert: {}", alertMessageString); alertsAbatementOutputEmitter.emit(alertMessageString); break; @@ -116,7 +138,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet { LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold); final TCAAlertsAbatementEntity previousAlertsAbatementEntry = TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName, - tcaAlertsAbatementTable); + tcaAlertsAbatementTable, redisHostAndPorts); if (previousAlertsAbatementEntry != null) { @@ -136,7 +158,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet { // save new Abatement alert sent timestamp in table TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, - Long.toString(newAbatementSentTS), tcaAlertsAbatementTable); + Long.toString(newAbatementSentTS), tcaAlertsAbatementTable, redisHostAndPorts); // Set request id to be same as previous ONSET event request ID tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId()); @@ -166,4 +188,25 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet { } + private static Set<HostAndPort> getRedisHostsAndPorts(final String redisHosts) { + final LinkedHashSet<HostAndPort> hostAndPorts = new LinkedHashSet<>(); + final String[] redisHostsString = redisHosts.split(","); + for (String redisHostString : redisHostsString) { + hostAndPorts.add(HostAndPort.parseString(redisHostString.trim())); + } + return hostAndPorts; + } + + private static void checkRedisConnection(final Set<HostAndPort> redisHostAndPorts) { + LOG.info("Checking Redis Connection for Redis Hosts: {}", redisHostAndPorts); + try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) { + final Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes(); + jedisCluster.get("testKey"); + LOG.info("Confirmed redis cluster Nodes: {}", clusterNodes.keySet()); + } catch (JedisConnectionException | IOException e) { + LOG.error("Unable to make Redis connection for given redisHosts: {}", redisHostAndPorts); + throw new DCAEAnalyticsRuntimeException("No Redis Connection", LOG, e); + } + } + } diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java index b55ab4f..9f10d8c 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java @@ -108,6 +108,12 @@ public class TCAAppPreferences implements CDAPAppPreferences { protected String aaiEnrichmentProxyURL; + + // Redis Distributed caching + protected Boolean enableRedisCaching; + + protected String redisHosts; + /** * Default constructor to setup default values for TCA App Preferences */ @@ -123,11 +129,15 @@ public class TCAAppPreferences implements CDAPAppPreferences { enableAlertCEFFormat = AnalyticsConstants.TCA_DEFAULT_ENABLE_CEF_FORMATTED_ALERT; + // aai enableAAIEnrichment = AnalyticsConstants.TCA_DEFAULT_ENABLE_AAI_ENRICHMENT; aaiEnrichmentIgnoreSSLCertificateErrors = AnalyticsConstants.TCA_DEFAULT_AAI_ENRICHMENT_IGNORE_SSL_CERTIFICATE_ERRORS; aaiEnrichmentProxyURL = AnalyticsConstants.TCA_DEFAULT_AAI_ENRICHMENT_PROXY_URL; + // redis + enableRedisCaching = AnalyticsConstants.TCA_DEFAULT_ENABLE_REDIS_CACHING; + } public String getSubscriberHostName() { @@ -311,6 +321,14 @@ public class TCAAppPreferences implements CDAPAppPreferences { return aaiEnrichmentProxyURL; } + public Boolean getEnableRedisCaching() { + return enableRedisCaching; + } + + public String getRedisHosts() { + return redisHosts; + } + @Override public String toString() { return Objects.toStringHelper(this) @@ -344,6 +362,8 @@ public class TCAAppPreferences implements CDAPAppPreferences { .add("aaiVNFEnrichmentAPIPath", aaiVNFEnrichmentAPIPath) .add("aaiVMEnrichmentAPIPath", aaiVMEnrichmentAPIPath) .add("aaiEnrichmentProxyEnabled", aaiEnrichmentProxyURL == null ? "false" : "true") + .add("enableRedisCaching", enableRedisCaching) + .add("redisHosts", redisHosts) .toString(); } } diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java index 261b74d..0eb96c4 100644 --- a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java +++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java @@ -79,6 +79,17 @@ public class TCAPreferencesValidator implements CDAPAppSettingsValidator<TCAAppP } } + final Boolean enableRedisCaching = appPreferences.getEnableRedisCaching(); + + // if redis distributed caching is enabled then redis Hosts must be provided + if(enableRedisCaching) { + final String redisHosts = appPreferences.getRedisHosts(); + if(isEmpty(redisHosts)) { + validationResponse.addErrorMessage("redisHosts", + "Redis Caching is enabled but no redis hosts are provided"); + } + } + return validationResponse; } } |