diff options
Diffstat (limited to 'catalog-dao/src/main/java/org/openecomp/sdc/be/dao/cassandra/CassandraClient.java')
-rw-r--r-- | catalog-dao/src/main/java/org/openecomp/sdc/be/dao/cassandra/CassandraClient.java | 359 |
1 files changed, 172 insertions, 187 deletions
diff --git a/catalog-dao/src/main/java/org/openecomp/sdc/be/dao/cassandra/CassandraClient.java b/catalog-dao/src/main/java/org/openecomp/sdc/be/dao/cassandra/CassandraClient.java index 028247d75a..624f9b44f2 100644 --- a/catalog-dao/src/main/java/org/openecomp/sdc/be/dao/cassandra/CassandraClient.java +++ b/catalog-dao/src/main/java/org/openecomp/sdc/be/dao/cassandra/CassandraClient.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,205 +17,190 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.openecomp.sdc.be.dao.cassandra; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Session; import com.datastax.driver.core.SocketOptions; -import com.datastax.driver.core.policies.*; +import com.datastax.driver.core.policies.ConstantReconnectionPolicy; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; import fj.data.Either; +import java.util.List; +import javax.annotation.PreDestroy; import org.apache.commons.lang3.tuple.ImmutablePair; import org.openecomp.sdc.be.config.ConfigurationManager; import org.openecomp.sdc.common.log.wrappers.Logger; import org.springframework.stereotype.Component; -import javax.annotation.PreDestroy; -import java.util.List; - @Component("cassandra-client") public class CassandraClient { - private static Logger logger = Logger.getLogger(CassandraClient.class.getName()); - - private Cluster cluster; - private boolean isConnected; - - public CassandraClient() { - super(); - isConnected = false; - List<String> cassandraHosts = null; - try { - cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig() - .getCassandraHosts(); - Integer cassandraPort = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig() - .getCassandraPort(); - Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration() - .getCassandraConfig().getReconnectTimeout(); - logger.debug("creating cluster to hosts:{} port:{} with reconnect timeout:{}", cassandraHosts, cassandraPort, reconnectTimeout); - Cluster.Builder clusterBuilder = Cluster.builder() - .withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout)) - .withRetryPolicy(DefaultRetryPolicy.INSTANCE); - - cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host).withPort(cassandraPort)); - setSocketOptions(clusterBuilder); - enableAuthentication(clusterBuilder); - enableSsl(clusterBuilder); - setLocalDc(clusterBuilder); - - cluster = clusterBuilder.build(); - isConnected = true; - } catch (Exception e) { - logger.info("** CassandraClient isn't connected to {}", cassandraHosts); - } - - logger.info("** CassandraClient created"); - } - - private void setSocketOptions(Cluster.Builder clusterBuilder) { - SocketOptions socketOptions =new SocketOptions(); - Integer socketConnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketConnectTimeout(); - if( socketConnectTimeout!=null ){ - logger.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .",socketConnectTimeout); - socketOptions.setConnectTimeoutMillis(socketConnectTimeout); - } - Integer socketReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketReadTimeout(); - if( socketReadTimeout != null ){ - logger.info("SocketReadTimeout was provided, setting Cassandra client to use SocketReadTimeout: {} .",socketReadTimeout); - socketOptions.setReadTimeoutMillis(socketReadTimeout); - } - clusterBuilder.withSocketOptions(socketOptions); - } - - private void setLocalDc(Cluster.Builder clusterBuilder) { - String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig() - .getLocalDataCenter(); - if (localDataCenter != null) { - logger.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.", - localDataCenter); - LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy( - DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build()); - clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy); - } else { - logger.info( - "localDatacenter was provided, the driver will use the datacenter of the first contact point that was reached at initialization"); - } - } - - private void enableSsl(Cluster.Builder clusterBuilder) { - boolean ssl = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().isSsl(); - if (ssl) { - String truststorePath = ConfigurationManager.getConfigurationManager().getConfiguration() - .getCassandraConfig().getTruststorePath(); - String truststorePassword = ConfigurationManager.getConfigurationManager().getConfiguration() - .getCassandraConfig().getTruststorePassword(); - if (truststorePath == null || truststorePassword == null) { - logger.error("ssl is enabled but truststorePath or truststorePassword were not supplied."); - } else { - System.setProperty("javax.net.ssl.trustStore", truststorePath); - System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword); - clusterBuilder.withSSL(); - } - - } - } - - private void enableAuthentication(Cluster.Builder clusterBuilder) { - boolean authenticate = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig() - .isAuthenticate(); - if (authenticate) { - String username = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig() - .getUsername(); - String password = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig() - .getPassword(); - if (username == null || password == null) { - logger.error("authentication is enabled but username or password were not supplied."); - } else { - clusterBuilder.withCredentials(username, password); - } - - } - } - - /** - * - * @param keyspace - * - key space to connect - * @return - */ - public Either<ImmutablePair<Session, MappingManager>, CassandraOperationStatus> connect(String keyspace) { - if (cluster != null) { - try { - Session session = cluster.connect(keyspace); - if (session != null) { - MappingManager manager = new MappingManager(session); - return Either.left(new ImmutablePair<>(session, manager)); - } else { - return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED); - } - } catch (Throwable e) { - logger.debug("Failed to connect to keyspace [{}], error :", keyspace, e); - return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED); - } - } - return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED); - } - - public <T> CassandraOperationStatus save(T entity, Class<T> clazz, MappingManager manager) { - if (!isConnected) { - return CassandraOperationStatus.CLUSTER_NOT_CONNECTED; - } - try { - Mapper<T> mapper = manager.mapper(clazz); - mapper.save(entity); - } catch (Exception e) { - logger.debug("Failed to save entity [{}], error :", entity, e); - return CassandraOperationStatus.GENERAL_ERROR; - } - return CassandraOperationStatus.OK; - } - - public <T> Either<T, CassandraOperationStatus> getById(String id, Class<T> clazz, MappingManager manager) { - if (!isConnected) { - return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED); - } - try { - Mapper<T> mapper = manager.mapper(clazz); - T result = mapper.get(id); - if (result == null) { - return Either.right(CassandraOperationStatus.NOT_FOUND); - } - return Either.left(result); - } catch (Exception e) { - logger.debug("Failed to get by Id [{}], error :", id, e); - return Either.right(CassandraOperationStatus.GENERAL_ERROR); - } - } - - public <T> CassandraOperationStatus delete(String id, Class<T> clazz, MappingManager manager) { - if (!isConnected) { - return CassandraOperationStatus.CLUSTER_NOT_CONNECTED; - } - try { - Mapper<T> mapper = manager.mapper(clazz); - mapper.delete(id); - } catch (Exception e) { - logger.debug("Failed to delete by id [{}], error :", id, e); - return CassandraOperationStatus.GENERAL_ERROR; - } - return CassandraOperationStatus.OK; - } - - public boolean isConnected() { - return isConnected; - } - @PreDestroy - public void closeClient() { - if (isConnected) { - cluster.close(); - } - logger.info("** CassandraClient cluster closed"); - } + private static Logger logger = Logger.getLogger(CassandraClient.class.getName()); + private Cluster cluster; + private boolean isConnected; + + public CassandraClient() { + super(); + isConnected = false; + List<String> cassandraHosts = null; + try { + cassandraHosts = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getCassandraHosts(); + Integer cassandraPort = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getCassandraPort(); + Long reconnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getReconnectTimeout(); + logger.debug("creating cluster to hosts:{} port:{} with reconnect timeout:{}", cassandraHosts, cassandraPort, reconnectTimeout); + Cluster.Builder clusterBuilder = Cluster.builder().withReconnectionPolicy(new ConstantReconnectionPolicy(reconnectTimeout)) + .withRetryPolicy(DefaultRetryPolicy.INSTANCE); + cassandraHosts.forEach(host -> clusterBuilder.addContactPoint(host).withPort(cassandraPort)); + setSocketOptions(clusterBuilder); + enableAuthentication(clusterBuilder); + enableSsl(clusterBuilder); + setLocalDc(clusterBuilder); + cluster = clusterBuilder.build(); + isConnected = true; + } catch (Exception e) { + logger.info("** CassandraClient isn't connected to {}", cassandraHosts); + } + logger.info("** CassandraClient created"); + } + + private void setSocketOptions(Cluster.Builder clusterBuilder) { + SocketOptions socketOptions = new SocketOptions(); + Integer socketConnectTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig() + .getSocketConnectTimeout(); + if (socketConnectTimeout != null) { + logger.info("SocketConnectTimeout was provided, setting Cassandra client to use SocketConnectTimeout: {} .", socketConnectTimeout); + socketOptions.setConnectTimeoutMillis(socketConnectTimeout); + } + Integer socketReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getSocketReadTimeout(); + if (socketReadTimeout != null) { + logger.info("SocketReadTimeout was provided, setting Cassandra client to use SocketReadTimeout: {} .", socketReadTimeout); + socketOptions.setReadTimeoutMillis(socketReadTimeout); + } + clusterBuilder.withSocketOptions(socketOptions); + } + + private void setLocalDc(Cluster.Builder clusterBuilder) { + String localDataCenter = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getLocalDataCenter(); + if (localDataCenter != null) { + logger.info("localDatacenter was provided, setting Cassndra clint to use datacenter: {} as local.", localDataCenter); + LoadBalancingPolicy tokenAwarePolicy = new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(localDataCenter).build()); + clusterBuilder.withLoadBalancingPolicy(tokenAwarePolicy); + } else { + logger.info( + "localDatacenter was provided, the driver will use the datacenter of the first contact point that was reached at initialization"); + } + } + + private void enableSsl(Cluster.Builder clusterBuilder) { + boolean ssl = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().isSsl(); + if (ssl) { + String truststorePath = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getTruststorePath(); + String truststorePassword = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig() + .getTruststorePassword(); + if (truststorePath == null || truststorePassword == null) { + logger.error("ssl is enabled but truststorePath or truststorePassword were not supplied."); + } else { + System.setProperty("javax.net.ssl.trustStore", truststorePath); + System.setProperty("javax.net.ssl.trustStorePassword", truststorePassword); + clusterBuilder.withSSL(); + } + } + } + + private void enableAuthentication(Cluster.Builder clusterBuilder) { + boolean authenticate = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().isAuthenticate(); + if (authenticate) { + String username = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getUsername(); + String password = ConfigurationManager.getConfigurationManager().getConfiguration().getCassandraConfig().getPassword(); + if (username == null || password == null) { + logger.error("authentication is enabled but username or password were not supplied."); + } else { + clusterBuilder.withCredentials(username, password); + } + } + } + + /** + * @param keyspace - key space to connect + * @return + */ + public Either<ImmutablePair<Session, MappingManager>, CassandraOperationStatus> connect(String keyspace) { + if (cluster != null) { + try { + Session session = cluster.connect(keyspace); + if (session != null) { + MappingManager manager = new MappingManager(session); + return Either.left(new ImmutablePair<>(session, manager)); + } else { + return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED); + } + } catch (Throwable e) { + logger.debug("Failed to connect to keyspace [{}], error :", keyspace, e); + return Either.right(CassandraOperationStatus.KEYSPACE_NOT_CONNECTED); + } + } + return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED); + } + + public <T> CassandraOperationStatus save(T entity, Class<T> clazz, MappingManager manager) { + if (!isConnected) { + return CassandraOperationStatus.CLUSTER_NOT_CONNECTED; + } + try { + Mapper<T> mapper = manager.mapper(clazz); + mapper.save(entity); + } catch (Exception e) { + logger.debug("Failed to save entity [{}], error :", entity, e); + return CassandraOperationStatus.GENERAL_ERROR; + } + return CassandraOperationStatus.OK; + } + + public <T> Either<T, CassandraOperationStatus> getById(String id, Class<T> clazz, MappingManager manager) { + if (!isConnected) { + return Either.right(CassandraOperationStatus.CLUSTER_NOT_CONNECTED); + } + try { + Mapper<T> mapper = manager.mapper(clazz); + T result = mapper.get(id); + if (result == null) { + return Either.right(CassandraOperationStatus.NOT_FOUND); + } + return Either.left(result); + } catch (Exception e) { + logger.debug("Failed to get by Id [{}], error :", id, e); + return Either.right(CassandraOperationStatus.GENERAL_ERROR); + } + } + + public <T> CassandraOperationStatus delete(String id, Class<T> clazz, MappingManager manager) { + if (!isConnected) { + return CassandraOperationStatus.CLUSTER_NOT_CONNECTED; + } + try { + Mapper<T> mapper = manager.mapper(clazz); + mapper.delete(id); + } catch (Exception e) { + logger.debug("Failed to delete by id [{}], error :", id, e); + return CassandraOperationStatus.GENERAL_ERROR; + } + return CassandraOperationStatus.OK; + } + + public boolean isConnected() { + return isConnected; + } + + @PreDestroy + public void closeClient() { + if (isConnected) { + cluster.close(); + } + logger.info("** CassandraClient cluster closed"); + } } |