diff options
Diffstat (limited to 'dcae-analytics-cdap-common')
3 files changed, 58 insertions, 6 deletions
diff --git a/dcae-analytics-cdap-common/pom.xml b/dcae-analytics-cdap-common/pom.xml index 17acbff..774f6ce 100644 --- a/dcae-analytics-cdap-common/pom.xml +++ b/dcae-analytics-cdap-common/pom.xml @@ -84,6 +84,12 @@ <artifactId>logback-classic</artifactId> </dependency> + <!-- DISTRIBUTED CACHING --> + <dependency> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> + </dependency> + <!-- FIND BUGS --> <dependency> <groupId>com.google.code.findbugs</groupId> diff --git a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java index 19cf9c7..57e2a59 100644 --- a/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java +++ b/dcae-analytics-cdap-common/src/main/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersister.java @@ -35,11 +35,16 @@ import org.onap.dcae.apod.analytics.model.domain.cef.EventListener; import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName; import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold; import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse; +import org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; +import java.io.IOException; import java.util.Date; import java.util.List; +import java.util.Set; import static org.onap.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME; @@ -85,14 +90,21 @@ public abstract class TCAAlertsAbatementPersister { final MetricsPerEventName violatedMetricsPerEventName, final TCAVESResponse tcavesResponse, final String abatementTS, - final ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable) { + final ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable, + final Set<HostAndPort> redisHostAndPorts) { final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName); final long currentTimestamp = new Date().getTime(); final String requestID = tcavesResponse.getRequestID(); final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = new TCAAlertsAbatementEntity(currentTimestamp, requestID, abatementTS); - tcaAlertsAbatementTable.write(abatementTableKey, tcaAlertsAbatementEntity); + + // if redis is enabled save entity in redis cluster + if (redisHostAndPorts != null) { + persistAlertAbatementEntityInRedis(redisHostAndPorts, abatementTableKey, tcaAlertsAbatementEntity); + } else { + tcaAlertsAbatementTable.write(abatementTableKey, tcaAlertsAbatementEntity); + } LOG.debug("Persisted AlertsAbatementEntity: {} with Key: {}", tcaAlertsAbatementEntity, abatementTableKey); @@ -101,12 +113,18 @@ public abstract class TCAAlertsAbatementPersister { public static TCAAlertsAbatementEntity lookUpByKey(final EventListener eventListener, final MetricsPerEventName violatedMetricsPerEventName, final ObjectMappedTable<TCAAlertsAbatementEntity> - tcaAlertsAbatementTable) { + tcaAlertsAbatementTable, + final Set<HostAndPort> redisHostAndPorts) { final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName); + + // if redis is enabled get entity from redis cluster + if (redisHostAndPorts != null) { + return getAlertAbatementEntityFromRedis(redisHostAndPorts, abatementTableKey); + } + return tcaAlertsAbatementTable.read(abatementTableKey); } - public static String createKey(final EventListener eventListener, final MetricsPerEventName violatedMetricsPerEventName) { // no null check required as all are required fields @@ -124,4 +142,32 @@ public abstract class TCAAlertsAbatementPersister { return KEY_JOINER.join(abatementKeyList); } + private static TCAAlertsAbatementEntity getAlertAbatementEntityFromRedis(final Set<HostAndPort> redisHostAndPorts, + final String abatementTableKey) { + try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) { + if (jedisCluster.exists(abatementTableKey)) { + return AnalyticsModelJsonUtils.readValue(jedisCluster.get(abatementTableKey), + TCAAlertsAbatementEntity.class); + } else { + return null; + } + } catch (IOException e) { + final String errorMessage = String.format("Unable to look up key: %s in redis cluster: %s", + abatementTableKey, redisHostAndPorts); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + + private static void persistAlertAbatementEntityInRedis(final Set<HostAndPort> redisHostAndPorts, + final String abatementTableKey, + final TCAAlertsAbatementEntity tcaAlertsAbatementEntity) { + try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) { + jedisCluster.set(abatementTableKey, AnalyticsModelJsonUtils.writeValueAsString(tcaAlertsAbatementEntity)); + } catch (IOException e) { + final String errorMessage = String.format("Unable to store key:value - %s:%s in redis cluster: %s", + abatementTableKey, tcaAlertsAbatementEntity, redisHostAndPorts); + throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); + } + } + } diff --git a/dcae-analytics-cdap-common/src/test/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java b/dcae-analytics-cdap-common/src/test/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java index 9415373..54fe870 100644 --- a/dcae-analytics-cdap-common/src/test/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java +++ b/dcae-analytics-cdap-common/src/test/java/org/onap/dcae/apod/analytics/cdap/common/persistance/tca/TCAAlertsAbatementPersisterTest.java @@ -101,7 +101,7 @@ public class TCAAlertsAbatementPersisterTest extends BaseAnalyticsCDAPCommonUnit public void testPersist() throws Exception { TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse, - abatementTS, alertsAbatementTable); + abatementTS, alertsAbatementTable, null); verify(alertsAbatementTable, times(1)).write(anyString(), any(TCAAlertsAbatementEntity.class)); @@ -109,7 +109,7 @@ public class TCAAlertsAbatementPersisterTest extends BaseAnalyticsCDAPCommonUnit @Test public void testLookUpByKey() throws Exception { - TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName, alertsAbatementTable); + TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName, alertsAbatementTable, null); verify(alertsAbatementTable, times(1)).read(eq(EXPECTED_LOOKUP_KEY)); } |