aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSingla, Rajiv (rs153v) <rs153v@att.com>2018-03-20 13:41:33 -0400
committerSingla, Rajiv (rs153v) <rs153v@att.com>2018-03-20 13:41:53 -0400
commit28eaf8279e520aba8ab8b6db8aec151af0c58b5d (patch)
tree19d3c81928c2c2541dc386b99cd96c8bc585006c
parent7904a8783b91aae406aa96949c9d28f9948110e9 (diff)
Added Redis Support
Issue-ID: DCAEGEN2-406 Change-Id: I857cab2a82de86181dcc7558b47e656d596245cf Signed-off-by: Singla, Rajiv (rs153v) <rs153v@att.com>
-rw-r--r--dcae-analytics-cdap-common/pom.xml6
-rw-r--r--dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java54
-rw-r--r--dcae-analytics-cdap-common/src/test/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java4
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java49
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java20
-rw-r--r--dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java11
-rw-r--r--dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/BaseAnalyticsCDAPTCAUnitTest.java7
-rw-r--r--dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowletTest.java15
-rw-r--r--dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCATestAppPreferences.java10
-rw-r--r--dcae-analytics-common/src/main/java/org/onap/dcae/apod/analytics/common/AnalyticsConstants.java3
-rw-r--r--dcae-analytics-test/src/main/resources/data/properties/tca_controller_policy.properties3
-rw-r--r--dcae-analytics-test/src/main/resources/data/properties/tca_controller_policy_from_json.properties2
-rw-r--r--pom.xml8
13 files changed, 181 insertions, 11 deletions
diff --git a/dcae-analytics-cdap-common/pom.xml b/dcae-analytics-cdap-common/pom.xml
index 17acbff..774f6ce 100644
--- a/dcae-analytics-cdap-common/pom.xml
+++ b/dcae-analytics-cdap-common/pom.xml
@@ -84,6 +84,12 @@
<artifactId>logback-classic</artifactId>
</dependency>
+ <!-- DISTRIBUTED CACHING -->
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ </dependency>
+
<!-- FIND BUGS -->
<dependency>
<groupId>com.google.code.findbugs</groupId>
diff --git a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java
index 19cf9c7..57e2a59 100644
--- a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java
+++ b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java
@@ -35,11 +35,16 @@ import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
+import org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import java.io.IOException;
import java.util.Date;
import java.util.List;
+import java.util.Set;
import static org.onap.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME;
@@ -85,14 +90,21 @@ public abstract class TCAAlertsAbatementPersister {
final MetricsPerEventName violatedMetricsPerEventName,
final TCAVESResponse tcavesResponse,
final String abatementTS,
- final ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable) {
+ final ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable,
+ final Set<HostAndPort> redisHostAndPorts) {
final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName);
final long currentTimestamp = new Date().getTime();
final String requestID = tcavesResponse.getRequestID();
final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = new TCAAlertsAbatementEntity(currentTimestamp,
requestID, abatementTS);
- tcaAlertsAbatementTable.write(abatementTableKey, tcaAlertsAbatementEntity);
+
+ // if redis is enabled save entity in redis cluster
+ if (redisHostAndPorts != null) {
+ persistAlertAbatementEntityInRedis(redisHostAndPorts, abatementTableKey, tcaAlertsAbatementEntity);
+ } else {
+ tcaAlertsAbatementTable.write(abatementTableKey, tcaAlertsAbatementEntity);
+ }
LOG.debug("Persisted AlertsAbatementEntity: {} with Key: {}", tcaAlertsAbatementEntity, abatementTableKey);
@@ -101,12 +113,18 @@ public abstract class TCAAlertsAbatementPersister {
public static TCAAlertsAbatementEntity lookUpByKey(final EventListener eventListener,
final MetricsPerEventName violatedMetricsPerEventName,
final ObjectMappedTable<TCAAlertsAbatementEntity>
- tcaAlertsAbatementTable) {
+ tcaAlertsAbatementTable,
+ final Set<HostAndPort> redisHostAndPorts) {
final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName);
+
+ // if redis is enabled get entity from redis cluster
+ if (redisHostAndPorts != null) {
+ return getAlertAbatementEntityFromRedis(redisHostAndPorts, abatementTableKey);
+ }
+
return tcaAlertsAbatementTable.read(abatementTableKey);
}
-
public static String createKey(final EventListener eventListener,
final MetricsPerEventName violatedMetricsPerEventName) {
// no null check required as all are required fields
@@ -124,4 +142,32 @@ public abstract class TCAAlertsAbatementPersister {
return KEY_JOINER.join(abatementKeyList);
}
+ private static TCAAlertsAbatementEntity getAlertAbatementEntityFromRedis(final Set<HostAndPort> redisHostAndPorts,
+ final String abatementTableKey) {
+ try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) {
+ if (jedisCluster.exists(abatementTableKey)) {
+ return AnalyticsModelJsonUtils.readValue(jedisCluster.get(abatementTableKey),
+ TCAAlertsAbatementEntity.class);
+ } else {
+ return null;
+ }
+ } catch (IOException e) {
+ final String errorMessage = String.format("Unable to look up key: %s in redis cluster: %s",
+ abatementTableKey, redisHostAndPorts);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+ private static void persistAlertAbatementEntityInRedis(final Set<HostAndPort> redisHostAndPorts,
+ final String abatementTableKey,
+ final TCAAlertsAbatementEntity tcaAlertsAbatementEntity) {
+ try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) {
+ jedisCluster.set(abatementTableKey, AnalyticsModelJsonUtils.writeValueAsString(tcaAlertsAbatementEntity));
+ } catch (IOException e) {
+ final String errorMessage = String.format("Unable to store key:value - %s:%s in redis cluster: %s",
+ abatementTableKey, tcaAlertsAbatementEntity, redisHostAndPorts);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
}
diff --git a/dcae-analytics-cdap-common/src/test/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java b/dcae-analytics-cdap-common/src/test/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java
index 9415373..54fe870 100644
--- a/dcae-analytics-cdap-common/src/test/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java
+++ b/dcae-analytics-cdap-common/src/test/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java
@@ -101,7 +101,7 @@ public class TCAAlertsAbatementPersisterTest extends BaseAnalyticsCDAPCommonUnit
public void testPersist() throws Exception {
TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
- abatementTS, alertsAbatementTable);
+ abatementTS, alertsAbatementTable, null);
verify(alertsAbatementTable, times(1)).write(anyString(),
any(TCAAlertsAbatementEntity.class));
@@ -109,7 +109,7 @@ public class TCAAlertsAbatementPersisterTest extends BaseAnalyticsCDAPCommonUnit
@Test
public void testLookUpByKey() throws Exception {
- TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName, alertsAbatementTable);
+ TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName, alertsAbatementTable, null);
verify(alertsAbatementTable, times(1)).read(eq(EXPECTED_LOOKUP_KEY));
}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java
index 759c3d5..f240556 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowlet.java
@@ -33,6 +33,9 @@ import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOu
import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
+import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
@@ -41,9 +44,16 @@ import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.exceptions.JedisConnectionException;
import java.io.IOException;
import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
/**
* Flowlet responsible to sending out abatement alerts
@@ -57,6 +67,8 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
@Property
private final String tcaAlertsAbatementTableName;
+ private Set<HostAndPort> redisHostAndPorts = null;
+
@Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
protected OutputEmitter<String> alertsAbatementOutputEmitter;
@@ -76,6 +88,16 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
public void initialize(FlowletContext flowletContext) throws Exception {
super.initialize(flowletContext);
tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);
+ // Parse runtime arguments
+ final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext);
+ if(tcaAppPreferences.getEnableRedisCaching()) {
+ final String redisHosts = tcaAppPreferences.getRedisHosts();
+ LOG.info("Redis Distributed Caching is enabled for abated alerts with Redis Hosts: {}", redisHosts);
+ redisHostAndPorts = getRedisHostsAndPorts(redisHosts);
+ checkRedisConnection(redisHostAndPorts);
+ } else {
+ LOG.info("Redis Distributed caching is disabled for abated alerts");
+ }
}
@ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
@@ -106,7 +128,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);
TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
- null, tcaAlertsAbatementTable);
+ null, tcaAlertsAbatementTable, redisHostAndPorts);
LOG.debug("Emitting ONSET alert: {}", alertMessageString);
alertsAbatementOutputEmitter.emit(alertMessageString);
break;
@@ -116,7 +138,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);
final TCAAlertsAbatementEntity previousAlertsAbatementEntry =
TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,
- tcaAlertsAbatementTable);
+ tcaAlertsAbatementTable, redisHostAndPorts);
if (previousAlertsAbatementEntry != null) {
@@ -136,7 +158,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
// save new Abatement alert sent timestamp in table
TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
- Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);
+ Long.toString(newAbatementSentTS), tcaAlertsAbatementTable, redisHostAndPorts);
// Set request id to be same as previous ONSET event request ID
tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId());
@@ -166,4 +188,25 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
}
+ private static Set<HostAndPort> getRedisHostsAndPorts(final String redisHosts) {
+ final LinkedHashSet<HostAndPort> hostAndPorts = new LinkedHashSet<>();
+ final String[] redisHostsString = redisHosts.split(",");
+ for (String redisHostString : redisHostsString) {
+ hostAndPorts.add(HostAndPort.parseString(redisHostString.trim()));
+ }
+ return hostAndPorts;
+ }
+
+ private static void checkRedisConnection(final Set<HostAndPort> redisHostAndPorts) {
+ LOG.info("Checking Redis Connection for Redis Hosts: {}", redisHostAndPorts);
+ try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) {
+ final Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
+ jedisCluster.get("testKey");
+ LOG.info("Confirmed redis cluster Nodes: {}", clusterNodes.keySet());
+ } catch (JedisConnectionException | IOException e) {
+ LOG.error("Unable to make Redis connection for given redisHosts: {}", redisHostAndPorts);
+ throw new DCAEAnalyticsRuntimeException("No Redis Connection", LOG, e);
+ }
+ }
+
}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java
index b55ab4f..9f10d8c 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCAAppPreferences.java
@@ -108,6 +108,12 @@ public class TCAAppPreferences implements CDAPAppPreferences {
protected String aaiEnrichmentProxyURL;
+
+ // Redis Distributed caching
+ protected Boolean enableRedisCaching;
+
+ protected String redisHosts;
+
/**
* Default constructor to setup default values for TCA App Preferences
*/
@@ -123,11 +129,15 @@ public class TCAAppPreferences implements CDAPAppPreferences {
enableAlertCEFFormat = AnalyticsConstants.TCA_DEFAULT_ENABLE_CEF_FORMATTED_ALERT;
+ // aai
enableAAIEnrichment = AnalyticsConstants.TCA_DEFAULT_ENABLE_AAI_ENRICHMENT;
aaiEnrichmentIgnoreSSLCertificateErrors =
AnalyticsConstants.TCA_DEFAULT_AAI_ENRICHMENT_IGNORE_SSL_CERTIFICATE_ERRORS;
aaiEnrichmentProxyURL = AnalyticsConstants.TCA_DEFAULT_AAI_ENRICHMENT_PROXY_URL;
+ // redis
+ enableRedisCaching = AnalyticsConstants.TCA_DEFAULT_ENABLE_REDIS_CACHING;
+
}
public String getSubscriberHostName() {
@@ -311,6 +321,14 @@ public class TCAAppPreferences implements CDAPAppPreferences {
return aaiEnrichmentProxyURL;
}
+ public Boolean getEnableRedisCaching() {
+ return enableRedisCaching;
+ }
+
+ public String getRedisHosts() {
+ return redisHosts;
+ }
+
@Override
public String toString() {
return Objects.toStringHelper(this)
@@ -344,6 +362,8 @@ public class TCAAppPreferences implements CDAPAppPreferences {
.add("aaiVNFEnrichmentAPIPath", aaiVNFEnrichmentAPIPath)
.add("aaiVMEnrichmentAPIPath", aaiVMEnrichmentAPIPath)
.add("aaiEnrichmentProxyEnabled", aaiEnrichmentProxyURL == null ? "false" : "true")
+ .add("enableRedisCaching", enableRedisCaching)
+ .add("redisHosts", redisHosts)
.toString();
}
}
diff --git a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java
index 261b74d..0eb96c4 100644
--- a/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java
+++ b/dcae-analytics-cdap-tca/src/main/java/org/onap/dcae/apod/analytics/cdap/tca/validator/TCAPreferencesValidator.java
@@ -79,6 +79,17 @@ public class TCAPreferencesValidator implements CDAPAppSettingsValidator<TCAAppP
}
}
+ final Boolean enableRedisCaching = appPreferences.getEnableRedisCaching();
+
+ // if redis distributed caching is enabled then redis Hosts must be provided
+ if(enableRedisCaching) {
+ final String redisHosts = appPreferences.getRedisHosts();
+ if(isEmpty(redisHosts)) {
+ validationResponse.addErrorMessage("redisHosts",
+ "Redis Caching is enabled but no redis hosts are provided");
+ }
+ }
+
return validationResponse;
}
}
diff --git a/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/BaseAnalyticsCDAPTCAUnitTest.java b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/BaseAnalyticsCDAPTCAUnitTest.java
index eff7374..37458fa 100644
--- a/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/BaseAnalyticsCDAPTCAUnitTest.java
+++ b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/BaseAnalyticsCDAPTCAUnitTest.java
@@ -221,6 +221,9 @@ public abstract class BaseAnalyticsCDAPTCAUnitTest extends BaseDCAEAnalyticsUnit
tcaTestAppPreferences.setAaiEnrichmentIgnoreSSLCertificateErrors(true);
tcaTestAppPreferences.setAaiVMEnrichmentAPIPath("VM_ENRICHMENT_PATH");
tcaTestAppPreferences.setAaiVNFEnrichmentAPIPath("VNF_ENRICHMENT_PATH");
+
+ tcaTestAppPreferences.setEnableRedisCaching(false);
+ tcaTestAppPreferences.setRedisHosts("127.0.0.1:6379");
return tcaTestAppPreferences;
}
@@ -248,6 +251,10 @@ public abstract class BaseAnalyticsCDAPTCAUnitTest extends BaseDCAEAnalyticsUnit
preference.put("publisherMaxBatchSize", "1000");
preference.put("publisherMaxRecoveryQueueSize", "100");
preference.put("publisherPollingInterval", "6000");
+
+ preference.put("enableRedisCaching", "false");
+ preference.put("redisHosts", "127.0.0.1:6379");
+
return preference;
}
diff --git a/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowletTest.java b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowletTest.java
index 7755a13..5df12f2 100644
--- a/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowletTest.java
+++ b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/flowlet/TCAVESAlertsAbatementFlowletTest.java
@@ -20,11 +20,13 @@
package org.onap.dcae.apod.analytics.cdap.tca.flowlet;
+import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
import co.cask.cdap.api.flow.flowlet.FlowletContext;
import co.cask.cdap.api.flow.flowlet.OutputEmitter;
import com.google.common.collect.ImmutableList;
import org.junit.Test;
+import org.mockito.Mockito;
import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
@@ -36,6 +38,7 @@ import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
+import java.io.IOException;
import java.util.Date;
import java.util.List;
@@ -99,6 +102,7 @@ public class TCAVESAlertsAbatementFlowletTest extends BaseAnalyticsCDAPTCAUnitTe
final FlowletContext mockFlowletContext = mock(FlowletContext.class);
final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);
when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);
+ configureFlowletContext(mockFlowletContext);
tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);
doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));
@@ -125,6 +129,7 @@ public class TCAVESAlertsAbatementFlowletTest extends BaseAnalyticsCDAPTCAUnitTe
final FlowletContext mockFlowletContext = mock(FlowletContext.class);
final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);
when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);
+ configureFlowletContext(mockFlowletContext);
tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);
doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));
@@ -154,6 +159,7 @@ public class TCAVESAlertsAbatementFlowletTest extends BaseAnalyticsCDAPTCAUnitTe
final FlowletContext mockFlowletContext = mock(FlowletContext.class);
final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);
when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);
+ configureFlowletContext(mockFlowletContext);
tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);
doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));
@@ -185,8 +191,8 @@ public class TCAVESAlertsAbatementFlowletTest extends BaseAnalyticsCDAPTCAUnitTe
final FlowletContext mockFlowletContext = mock(FlowletContext.class);
final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);
when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);
+ configureFlowletContext(mockFlowletContext);
tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);
-
doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));
when(mockObjectMappedTable.read(any(String.class))).thenReturn(null);
@@ -248,4 +254,11 @@ public class TCAVESAlertsAbatementFlowletTest extends BaseAnalyticsCDAPTCAUnitTe
return thresholdCalculatorOutput;
}
+ private void configureFlowletContext(final FlowletContext mockFlowletContext) throws IOException {
+ when(mockFlowletContext.getRuntimeArguments()).thenReturn(getPreferenceMap());
+ ApplicationSpecification mockApplicationSpecification = Mockito.mock(ApplicationSpecification.class);
+ when(mockApplicationSpecification.getConfiguration()).thenReturn(fromStream(TCA_APP_CONFIG_FILE_LOCATION));
+ when(mockFlowletContext.getApplicationSpecification()).thenReturn(mockApplicationSpecification);
+ }
+
}
diff --git a/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCATestAppPreferences.java b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCATestAppPreferences.java
index 729630c..ad299b9 100644
--- a/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCATestAppPreferences.java
+++ b/dcae-analytics-cdap-tca/src/test/java/org/onap/dcae/apod/analytics/cdap/tca/settings/TCATestAppPreferences.java
@@ -173,4 +173,14 @@ public class TCATestAppPreferences extends TCAAppPreferences {
public void setAaiVMEnrichmentAPIPath(String aaiVMEnrichmentAPIPath) {
this.aaiVMEnrichmentAPIPath = aaiVMEnrichmentAPIPath;
}
+
+ public void setEnableRedisCaching(final Boolean enableRedisCaching) {
+ this.enableRedisCaching = enableRedisCaching;
+ }
+
+ public void setRedisHosts(final String redisHosts) {
+ this.redisHosts = redisHosts;
+ }
+
+
}
diff --git a/dcae-analytics-common/src/main/java/org/onap/dcae/apod/analytics/common/AnalyticsConstants.java b/dcae-analytics-common/src/main/java/org/onap/dcae/apod/analytics/common/AnalyticsConstants.java
index 62a6fab..3344eec 100644
--- a/dcae-analytics-common/src/main/java/org/onap/dcae/apod/analytics/common/AnalyticsConstants.java
+++ b/dcae-analytics-common/src/main/java/org/onap/dcae/apod/analytics/common/AnalyticsConstants.java
@@ -159,6 +159,9 @@ public abstract class AnalyticsConstants {
public static final String TCA_DEFAULT_AAI_ENRICHMENT_PROXY_URL = null;
+ // TCA Redis Distributed Caching default
+ public static final Boolean TCA_DEFAULT_ENABLE_REDIS_CACHING = true;
+
private AnalyticsConstants() {
diff --git a/dcae-analytics-test/src/main/resources/data/properties/tca_controller_policy.properties b/dcae-analytics-test/src/main/resources/data/properties/tca_controller_policy.properties
index 5465488..b5622fa 100644
--- a/dcae-analytics-test/src/main/resources/data/properties/tca_controller_policy.properties
+++ b/dcae-analytics-test/src/main/resources/data/properties/tca_controller_policy.properties
@@ -33,6 +33,9 @@ enableAlertCEFFormat=false
domain=measurementsForVfScaling
+enableRedisCaching=false
+redisHosts=127.0.0.1:7000
+
configuration.metricsPerEventName.Mfvs_eNodeB_RANKPI.policy.eventName=Mfvs_eNodeB_RANKPI
configuration.metricsPerEventName.Mfvs_eNodeB_RANKPI.policyName=configuration.dcae.microservice.tca.xml
configuration.metricsPerEventName.Mfvs_eNodeB_RANKPI.policyVersion=v0.0.1
diff --git a/dcae-analytics-test/src/main/resources/data/properties/tca_controller_policy_from_json.properties b/dcae-analytics-test/src/main/resources/data/properties/tca_controller_policy_from_json.properties
index 92354f5..9b31468 100644
--- a/dcae-analytics-test/src/main/resources/data/properties/tca_controller_policy_from_json.properties
+++ b/dcae-analytics-test/src/main/resources/data/properties/tca_controller_policy_from_json.properties
@@ -45,3 +45,5 @@ aaiEnrichmentProxyURL=http://username:password@proxyhost.com:8080
tca_policy={\"domain\":\"measurementsForVfScaling\",\"metricsPerEventName\":[{\"eventName\":\"Mfvs_eNodeB_RANKPI\",\"controlLoopSchemaType\":\"VNF\",\"policyScope\":\"resource=vFirewall;type=configuration\",\"policyName\":\"configuration.dcae.microservice.tca.xml\",\"policyVersion\":\"v0.0.1\",\"thresholds\":[{\"closedLoopControlName\":\"CL-FRWL-LOW-TRAFFIC-SIG-d925ed73-8231-4d02-9545-db4e101f88f8\",\"closedLoopEventStatus\":\"ONSET\",\"version\":\"1.0.2\",\"fieldPath\":\"$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated\",\"thresholdValue\":4000,\"direction\":\"LESS_OR_EQUAL\",\"severity\":\"MAJOR\"},{\"closedLoopControlName\":\"CL-FRWL-HIGH-TRAFFIC-SIG-EA36FE84-9342-5E13-A656-EC5F21309A09\",\"closedLoopEventStatus\":\"ONSET\",\"version\":\"1.0.2\",\"fieldPath\":\"$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated\",\"thresholdValue\":20000,\"direction\":\"GREATER_OR_EQUAL\",\"severity\":\"CRITICAL\"}]},{\"eventName\":\"vLoadBalancer\",\"controlLoopSchemaType\":\"VNF\",\"policyScope\":\"resource=vLoadBalancer;type=configuration\",\"policyName\":\"configuration.dcae.microservice.tca.xml\",\"policyVersion\":\"v0.0.1\",\"thresholds\":[{\"closedLoopControlName\":\"CL-LBAL-LOW-TRAFFIC-SIG-FB480F95-A453-6F24-B767-FD703241AB1A\",\"closedLoopEventStatus\":\"ONSET\",\"version\":\"1.0.2\",\"fieldPath\":\"$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated\",\"thresholdValue\":500,\"direction\":\"LESS_OR_EQUAL\",\"severity\":\"MAJOR\"},{\"closedLoopControlName\":\"CL-LBAL-LOW-TRAFFIC-SIG-0C5920A6-B564-8035-C878-0E814352BC2B\",\"closedLoopEventStatus\":\"ONSET\",\"version\":\"1.0.2\",\"fieldPath\":\"$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated\",\"thresholdValue\":5000,\"direction\":\"GREATER_OR_EQUAL\",\"severity\":\"CRITICAL\"}]}]}
+enableRedisCaching=false
+redisHosts=127.0.0.1:7000
diff --git a/pom.xml b/pom.xml
index 2f92260..23f803e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,7 +72,7 @@
<findbugs.jsr305.version>2.0.1</findbugs.jsr305.version>
<findbugs.annotations.version>3.0.0</findbugs.annotations.version>
<lombok.version>1.16.10</lombok.version>
-
+ <jedis.version>2.9.0</jedis.version>
<json.path.version>2.2.0</json.path.version>
<quartz.version>2.2.0</quartz.version>
<httpclient.version>4.5.2</httpclient.version>
@@ -436,6 +436,12 @@
<scope>provided</scope>
</dependency>
+ <!-- DISTRIBUTED CACHING -->
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>${jedis.version}</version>
+ </dependency>
<!-- DEPENDENCY INJECTION -->
<dependency>