summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java')
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java672
1 files changed, 672 insertions, 0 deletions
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java
new file mode 100644
index 0000000..f73ec15
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java
@@ -0,0 +1,672 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import com.netflix.astyanax.AstyanaxContext;
+import com.netflix.astyanax.Cluster;
+import com.netflix.astyanax.ColumnListMutation;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.Host;
+import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
+import com.netflix.astyanax.connectionpool.RetryBackoffStrategy;
+import com.netflix.astyanax.connectionpool.SSLConnectionContext;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
+import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
+import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
+import com.netflix.astyanax.connectionpool.impl.ExponentialRetryBackoffStrategy;
+import com.netflix.astyanax.connectionpool.impl.SimpleAuthenticationCredentials;
+import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
+import com.netflix.astyanax.ddl.KeyspaceDefinition;
+import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.retry.RetryPolicy;
+import com.netflix.astyanax.thrift.ThriftFamilyFactory;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.Entry;
+import com.thinkaurelius.titan.diskstorage.EntryMetaData;
+import com.thinkaurelius.titan.diskstorage.PermanentBackendException;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.StoreMetaData;
+import com.thinkaurelius.titan.diskstorage.TemporaryBackendException;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
+
+@PreInitializeConfigOptions
+public class AstyanaxStoreManager extends AbstractCassandraStoreManager {
+
+ private static final Logger log = LoggerFactory.getLogger(AstyanaxStoreManager.class);
+
+ //################### ASTYANAX SPECIFIC CONFIGURATION OPTIONS ######################
+
+ public static final ConfigNamespace ASTYANAX_NS =
+ new ConfigNamespace(CASSANDRA_NS, "astyanax", "Astyanax-specific Cassandra options");
+
+ /**
+ * Default name for the Cassandra cluster
+ * <p/>
+ */
+ public static final ConfigOption<String> CLUSTER_NAME =
+ new ConfigOption<String>(ASTYANAX_NS, "cluster-name",
+ "Default name for the Cassandra cluster",
+ ConfigOption.Type.MASKABLE, "Titan Cluster");
+
+ /**
+ * Maximum pooled connections per host.
+ * <p/>
+ */
+ public static final ConfigOption<Integer> MAX_CONNECTIONS_PER_HOST =
+ new ConfigOption<Integer>(ASTYANAX_NS, "max-connections-per-host",
+ "Maximum pooled connections per host",
+ ConfigOption.Type.MASKABLE, 32);
+
+ /**
+ * Maximum open connections allowed in the pool (counting all hosts).
+ * <p/>
+ */
+ public static final ConfigOption<Integer> MAX_CONNECTIONS =
+ new ConfigOption<Integer>(ASTYANAX_NS, "max-connections",
+ "Maximum open connections allowed in the pool (counting all hosts)",
+ ConfigOption.Type.MASKABLE, -1);
+
+ /**
+ * Maximum number of operations allowed per connection before the connection is closed.
+ * <p/>
+ */
+ public static final ConfigOption<Integer> MAX_OPERATIONS_PER_CONNECTION =
+ new ConfigOption<Integer>(ASTYANAX_NS, "max-operations-per-connection",
+ "Maximum number of operations allowed per connection before the connection is closed",
+ ConfigOption.Type.MASKABLE, 100 * 1000);
+
+ /**
+ * Maximum pooled "cluster" connections per host.
+ * <p/>
+ * These connections are mostly idle and only used for DDL operations
+ * (like creating keyspaces). Titan doesn't need many of these connections
+ * in ordinary operation.
+ */
+ public static final ConfigOption<Integer> MAX_CLUSTER_CONNECTIONS_PER_HOST =
+ new ConfigOption<Integer>(ASTYANAX_NS, "max-cluster-connections-per-host",
+ "Maximum pooled \"cluster\" connections per host",
+ ConfigOption.Type.MASKABLE, 3);
+
+ /**
+ * How Astyanax discovers Cassandra cluster nodes. This must be one of the
+ * values of the Astyanax NodeDiscoveryType enum.
+ * <p/>
+ */
+ public static final ConfigOption<String> NODE_DISCOVERY_TYPE =
+ new ConfigOption<String>(ASTYANAX_NS, "node-discovery-type",
+ "How Astyanax discovers Cassandra cluster nodes",
+ ConfigOption.Type.MASKABLE, "RING_DESCRIBE");
+
+ /**
+ * Astyanax specific host supplier useful only when discovery type set to DISCOVERY_SERVICE or TOKEN_AWARE.
+ * Excepts fully qualified class name which extends google.common.base.Supplier<List<Host>>.
+ */
+ public static final ConfigOption<String> HOST_SUPPLIER =
+ new ConfigOption<String>(ASTYANAX_NS, "host-supplier",
+ "Host supplier to use when discovery type is set to DISCOVERY_SERVICE or TOKEN_AWARE",
+ ConfigOption.Type.MASKABLE, String.class);
+
+ /**
+ * Astyanax's connection pooler implementation. This must be one of the
+ * values of the Astyanax ConnectionPoolType enum.
+ * <p/>
+ */
+ public static final ConfigOption<String> CONNECTION_POOL_TYPE =
+ new ConfigOption<String>(ASTYANAX_NS, "connection-pool-type",
+ "Astyanax's connection pooler implementation",
+ ConfigOption.Type.MASKABLE, "TOKEN_AWARE");
+
+ /**
+ * In Astyanax, RetryPolicy and RetryBackoffStrategy sound and look similar
+ * but are used for distinct purposes. RetryPolicy is for retrying failed
+ * operations. RetryBackoffStrategy is for retrying attempts to talk to
+ * uncommunicative hosts. This config option controls RetryPolicy.
+ */
+ public static final ConfigOption<String> RETRY_POLICY =
+ new ConfigOption<String>(ASTYANAX_NS, "retry-policy",
+ "Astyanax's retry policy implementation with configuration parameters",
+ ConfigOption.Type.MASKABLE, "com.netflix.astyanax.retry.BoundedExponentialBackoff,100,25000,8");
+
+ /**
+ * If non-null, this must be the fully-qualified classname (i.e. the
+ * complete package name, a dot, and then the class name) of an
+ * implementation of Astyanax's RetryBackoffStrategy interface. This string
+ * may be followed by a sequence of integers, separated from the full
+ * classname and from each other by commas; in this case, the integers are
+ * cast to native Java ints and passed to the class constructor as
+ * arguments. Here's an example setting that would instantiate an Astyanax
+ * FixedRetryBackoffStrategy with an delay interval of 1s and suspend time
+ * of 5s:
+ * <p/>
+ * <code>
+ * com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy,1000,5000
+ * </code>
+ * <p/>
+ * If null, then Astyanax uses its default strategy, which is an
+ * ExponentialRetryBackoffStrategy instance. The instance parameters take
+ * Astyanax's built-in default values, which can be overridden via the
+ * following config keys:
+ * <ul>
+ * <li>{@link #RETRY_DELAY_SLICE}</li>
+ * <li>{@link #RETRY_MAX_DELAY_SLICE}</li>
+ * <li>{@link #RETRY_SUSPEND_WINDOW}</li>
+ * </ul>
+ * <p/>
+ * In Astyanax, RetryPolicy and RetryBackoffStrategy sound and look similar
+ * but are used for distinct purposes. RetryPolicy is for retrying failed
+ * operations. RetryBackoffStrategy is for retrying attempts to talk to
+ * uncommunicative hosts. This config option controls RetryBackoffStrategy.
+ */
+ public static final ConfigOption<String> RETRY_BACKOFF_STRATEGY =
+ new ConfigOption<String>(ASTYANAX_NS, "retry-backoff-strategy",
+ "Astyanax's retry backoff strategy with configuration parameters",
+ ConfigOption.Type.MASKABLE, "com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy,1000,5000");
+
+ /**
+ * Controls the retryDelaySlice parameter on Astyanax
+ * ConnectionPoolConfigurationImpl objects, which is in turn used by
+ * ExponentialRetryBackoffStrategy. See the code for
+ * {@link ConnectionPoolConfigurationImpl},
+ * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
+ * {@link #RETRY_BACKOFF_STRATEGY} for more information.
+ * <p/>
+ * This parameter is not meaningful for and has no effect on
+ * FixedRetryBackoffStrategy.
+ */
+ public static final ConfigOption<Integer> RETRY_DELAY_SLICE =
+ new ConfigOption<Integer>(ASTYANAX_NS, "retry-delay-slice",
+ "Astyanax's connection pool \"retryDelaySlice\" parameter",
+ ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_DELAY_SLICE);
+ /**
+ * Controls the retryMaxDelaySlice parameter on Astyanax
+ * ConnectionPoolConfigurationImpl objects, which is in turn used by
+ * ExponentialRetryBackoffStrategy. See the code for
+ * {@link ConnectionPoolConfigurationImpl},
+ * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
+ * {@link #RETRY_BACKOFF_STRATEGY} for more information.
+ * <p/>
+ * This parameter is not meaningful for and has no effect on
+ * FixedRetryBackoffStrategy.
+ */
+ public static final ConfigOption<Integer> RETRY_MAX_DELAY_SLICE =
+ new ConfigOption<Integer>(ASTYANAX_NS, "retry-max-delay-slice",
+ "Astyanax's connection pool \"retryMaxDelaySlice\" parameter",
+ ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_MAX_DELAY_SLICE);
+
+ /**
+ * Controls the retrySuspendWindow parameter on Astyanax
+ * ConnectionPoolConfigurationImpl objects, which is in turn used by
+ * ExponentialRetryBackoffStrategy. See the code for
+ * {@link ConnectionPoolConfigurationImpl},
+ * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
+ * {@link #RETRY_BACKOFF_STRATEGY} for more information.
+ * <p/>
+ * This parameter is not meaningful for and has no effect on
+ * FixedRetryBackoffStrategy.
+ */
+ public static final ConfigOption<Integer> RETRY_SUSPEND_WINDOW =
+ new ConfigOption<Integer>(ASTYANAX_NS, "retry-suspend-window",
+ "Astyanax's connection pool \"retryMaxDelaySlice\" parameter",
+ ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_SUSPEND_WINDOW);
+
+ /**
+ * Controls the frame size of thrift sockets created by Astyanax.
+ */
+ public static final ConfigOption<Integer> THRIFT_FRAME_SIZE =
+ new ConfigOption<Integer>(ASTYANAX_NS, "frame-size",
+ "The thrift frame size in mega bytes", ConfigOption.Type.MASKABLE, 15);
+
+ public static final ConfigOption<String> LOCAL_DATACENTER =
+ new ConfigOption<String>(ASTYANAX_NS, "local-datacenter",
+ "The name of the local or closest Cassandra datacenter. When set and not whitespace, " +
+ "this value will be passed into ConnectionPoolConfigurationImpl.setLocalDatacenter. " +
+ "When unset or set to whitespace, setLocalDatacenter will not be invoked.",
+ /* It's between either LOCAL or MASKABLE. MASKABLE could be useful for cases where
+ all the Titan instances are closest to the same Cassandra DC. */
+ ConfigOption.Type.MASKABLE, String.class);
+
+ private final String clusterName;
+
+ private final AstyanaxContext<Keyspace> keyspaceContext;
+ private final AstyanaxContext<Cluster> clusterContext;
+
+ private final RetryPolicy retryPolicy;
+
+ private final int retryDelaySlice;
+ private final int retryMaxDelaySlice;
+ private final int retrySuspendWindow;
+ private final RetryBackoffStrategy retryBackoffStrategy;
+
+ private final String localDatacenter;
+
+ private final Map<String, AstyanaxKeyColumnValueStore> openStores;
+
+ public AstyanaxStoreManager(Configuration config) throws BackendException {
+ super(config);
+
+ this.clusterName = config.get(CLUSTER_NAME);
+
+ retryDelaySlice = config.get(RETRY_DELAY_SLICE);
+ retryMaxDelaySlice = config.get(RETRY_MAX_DELAY_SLICE);
+ retrySuspendWindow = config.get(RETRY_SUSPEND_WINDOW);
+ retryBackoffStrategy = getRetryBackoffStrategy(config.get(RETRY_BACKOFF_STRATEGY));
+ retryPolicy = getRetryPolicy(config.get(RETRY_POLICY));
+
+ localDatacenter = config.has(LOCAL_DATACENTER) ?
+ config.get(LOCAL_DATACENTER) : "";
+
+ final int maxConnsPerHost = config.get(MAX_CONNECTIONS_PER_HOST);
+
+ final int maxClusterConnsPerHost = config.get(MAX_CLUSTER_CONNECTIONS_PER_HOST);
+
+ this.clusterContext = createCluster(getContextBuilder(config, maxClusterConnsPerHost, "Cluster"));
+
+ ensureKeyspaceExists(clusterContext.getClient());
+
+ this.keyspaceContext = getContextBuilder(config, maxConnsPerHost, "Keyspace").buildKeyspace(ThriftFamilyFactory.getInstance());
+ this.keyspaceContext.start();
+
+ openStores = new HashMap<String, AstyanaxKeyColumnValueStore>(8);
+ }
+
+ @Override
+ public Deployment getDeployment() {
+ return Deployment.REMOTE; // TODO
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public IPartitioner getCassandraPartitioner() throws BackendException {
+ Cluster cl = clusterContext.getClient();
+ try {
+ return FBUtilities.newPartitioner(cl.describePartitioner());
+ } catch (ConnectionException e) {
+ throw new TemporaryBackendException(e);
+ } catch (ConfigurationException e) {
+ throw new PermanentBackendException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "astyanax" + super.toString();
+ }
+
+ @Override
+ public void close() {
+ // Shutdown the Astyanax contexts
+ openStores.clear();
+ keyspaceContext.shutdown();
+ clusterContext.shutdown();
+ }
+
+ @Override
+ public synchronized AstyanaxKeyColumnValueStore openDatabase(String name, StoreMetaData.Container metaData) throws BackendException {
+ if (openStores.containsKey(name)) return openStores.get(name);
+ else {
+ ensureColumnFamilyExists(name);
+ AstyanaxKeyColumnValueStore store = new AstyanaxKeyColumnValueStore(name, keyspaceContext.getClient(), this, retryPolicy);
+ openStores.put(name, store);
+ return store;
+ }
+ }
+
+ @Override
+ public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> batch, StoreTransaction txh) throws BackendException {
+ MutationBatch m = keyspaceContext.getClient().prepareMutationBatch().withAtomicBatch(atomicBatch)
+ .setConsistencyLevel(getTx(txh).getWriteConsistencyLevel().getAstyanax())
+ .withRetryPolicy(retryPolicy.duplicate());
+
+ final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
+
+ for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> batchentry : batch.entrySet()) {
+ String storeName = batchentry.getKey();
+ Preconditions.checkArgument(openStores.containsKey(storeName), "Store cannot be found: " + storeName);
+
+ ColumnFamily<ByteBuffer, ByteBuffer> columnFamily = openStores.get(storeName).getColumnFamily();
+
+ Map<StaticBuffer, KCVMutation> mutations = batchentry.getValue();
+ for (Map.Entry<StaticBuffer, KCVMutation> ent : mutations.entrySet()) {
+ // The CLMs for additions and deletions are separated because
+ // Astyanax's operation timestamp cannot be set on a per-delete
+ // or per-addition basis.
+ KCVMutation titanMutation = ent.getValue();
+ ByteBuffer key = ent.getKey().asByteBuffer();
+
+ if (titanMutation.hasDeletions()) {
+ ColumnListMutation<ByteBuffer> dels = m.withRow(columnFamily, key);
+ dels.setTimestamp(commitTime.getDeletionTime(times));
+
+ for (StaticBuffer b : titanMutation.getDeletions())
+ dels.deleteColumn(b.as(StaticBuffer.BB_FACTORY));
+ }
+
+ if (titanMutation.hasAdditions()) {
+ ColumnListMutation<ByteBuffer> upds = m.withRow(columnFamily, key);
+ upds.setTimestamp(commitTime.getAdditionTime(times));
+
+ for (Entry e : titanMutation.getAdditions()) {
+ Integer ttl = (Integer) e.getMetaData().get(EntryMetaData.TTL);
+
+ if (null != ttl && ttl > 0) {
+ upds.putColumn(e.getColumnAs(StaticBuffer.BB_FACTORY), e.getValueAs(StaticBuffer.BB_FACTORY), ttl);
+ } else {
+ upds.putColumn(e.getColumnAs(StaticBuffer.BB_FACTORY), e.getValueAs(StaticBuffer.BB_FACTORY));
+ }
+ }
+ }
+ }
+ }
+
+ try {
+ m.execute();
+ } catch (ConnectionException e) {
+ throw new TemporaryBackendException(e);
+ }
+
+ sleepAfterWrite(txh, commitTime);
+ }
+
+ @Override
+ public List<KeyRange> getLocalKeyPartition() throws BackendException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clearStorage() throws BackendException {
+ try {
+ Cluster cluster = clusterContext.getClient();
+
+ Keyspace ks = cluster.getKeyspace(keySpaceName);
+
+ // Not a big deal if Keyspace doesn't not exist (dropped manually by user or tests).
+ // This is called on per test setup basis to make sure that previous test cleaned
+ // everything up, so first invocation would always fail as Keyspace doesn't yet exist.
+ if (ks == null)
+ return;
+
+ for (ColumnFamilyDefinition cf : cluster.describeKeyspace(keySpaceName).getColumnFamilyList()) {
+ ks.truncateColumnFamily(new ColumnFamily<Object, Object>(cf.getName(), null, null));
+ }
+ } catch (ConnectionException e) {
+ throw new PermanentBackendException(e);
+ }
+ }
+
+ private void ensureColumnFamilyExists(String name) throws BackendException {
+ ensureColumnFamilyExists(name, "org.apache.cassandra.db.marshal.BytesType");
+ }
+
+ private void ensureColumnFamilyExists(String name, String comparator) throws BackendException {
+ Cluster cl = clusterContext.getClient();
+ try {
+ KeyspaceDefinition ksDef = cl.describeKeyspace(keySpaceName);
+ boolean found = false;
+ if (null != ksDef) {
+ for (ColumnFamilyDefinition cfDef : ksDef.getColumnFamilyList()) {
+ found |= cfDef.getName().equals(name);
+ }
+ }
+ if (!found) {
+ ColumnFamilyDefinition cfDef =
+ cl.makeColumnFamilyDefinition()
+ .setName(name)
+ .setKeyspace(keySpaceName)
+ .setComparatorType(comparator);
+
+ ImmutableMap.Builder<String, String> compressionOptions = new ImmutableMap.Builder<String, String>();
+
+ if (compressionEnabled) {
+ compressionOptions.put("sstable_compression", compressionClass)
+ .put("chunk_length_kb", Integer.toString(compressionChunkSizeKB));
+ }
+
+ cl.addColumnFamily(cfDef.setCompressionOptions(compressionOptions.build()));
+ }
+ } catch (ConnectionException e) {
+ throw new TemporaryBackendException(e);
+ }
+ }
+
+ private static AstyanaxContext<Cluster> createCluster(AstyanaxContext.Builder cb) {
+ AstyanaxContext<Cluster> clusterCtx = cb.buildCluster(ThriftFamilyFactory.getInstance());
+ clusterCtx.start();
+
+ return clusterCtx;
+ }
+
+ private AstyanaxContext.Builder getContextBuilder(Configuration config, int maxConnsPerHost, String usedFor) {
+
+ final ConnectionPoolType poolType = ConnectionPoolType.valueOf(config.get(CONNECTION_POOL_TYPE));
+
+ final NodeDiscoveryType discType = NodeDiscoveryType.valueOf(config.get(NODE_DISCOVERY_TYPE));
+
+ final int maxConnections = config.get(MAX_CONNECTIONS);
+
+ final int maxOperationsPerConnection = config.get(MAX_OPERATIONS_PER_CONNECTION);
+
+ final int connectionTimeout = (int) connectionTimeoutMS.toMillis();
+
+ ConnectionPoolConfigurationImpl cpool =
+ new ConnectionPoolConfigurationImpl(usedFor + "TitanConnectionPool")
+ .setPort(port)
+ .setMaxOperationsPerConnection(maxOperationsPerConnection)
+ .setMaxConnsPerHost(maxConnsPerHost)
+ .setRetryDelaySlice(retryDelaySlice)
+ .setRetryMaxDelaySlice(retryMaxDelaySlice)
+ .setRetrySuspendWindow(retrySuspendWindow)
+ .setSocketTimeout(connectionTimeout)
+ .setConnectTimeout(connectionTimeout)
+ .setSeeds(StringUtils.join(hostnames, ","));
+
+ if (null != retryBackoffStrategy) {
+ cpool.setRetryBackoffStrategy(retryBackoffStrategy);
+ log.debug("Custom RetryBackoffStrategy {}", cpool.getRetryBackoffStrategy());
+ } else {
+ log.debug("Default RetryBackoffStrategy {}", cpool.getRetryBackoffStrategy());
+ }
+
+ if (StringUtils.isNotBlank(localDatacenter)) {
+ cpool.setLocalDatacenter(localDatacenter);
+ log.debug("Set local datacenter: {}", cpool.getLocalDatacenter());
+ }
+
+ AstyanaxConfigurationImpl aconf =
+ new AstyanaxConfigurationImpl()
+ .setConnectionPoolType(poolType)
+ .setDiscoveryType(discType)
+ .setTargetCassandraVersion("1.2")
+ .setMaxThriftSize(thriftFrameSizeBytes);
+
+ if (0 < maxConnections) {
+ cpool.setMaxConns(maxConnections);
+ }
+
+ if (hasAuthentication()) {
+ cpool.setAuthenticationCredentials(new SimpleAuthenticationCredentials(username, password));
+ }
+
+ if (config.get(SSL_ENABLED)) {
+ cpool.setSSLConnectionContext(new SSLConnectionContext(config.get(SSL_TRUSTSTORE_LOCATION), config.get(SSL_TRUSTSTORE_PASSWORD)));
+ }
+
+ AstyanaxContext.Builder ctxBuilder = new AstyanaxContext.Builder();
+
+ // Standard context builder options
+ ctxBuilder
+ .forCluster(clusterName)
+ .forKeyspace(keySpaceName)
+ .withAstyanaxConfiguration(aconf)
+ .withConnectionPoolConfiguration(cpool)
+ .withConnectionPoolMonitor(new CountingConnectionPoolMonitor());
+
+ // Conditional context builder option: host supplier
+ if (config.has(HOST_SUPPLIER)) {
+ String hostSupplier = config.get(HOST_SUPPLIER);
+ Supplier<List<Host>> supplier = null;
+ if (hostSupplier != null) {
+ try {
+ supplier = (Supplier<List<Host>>) Class.forName(hostSupplier).newInstance();
+ ctxBuilder.withHostSupplier(supplier);
+ } catch (Exception e) {
+ log.warn("Problem with host supplier class " + hostSupplier + ", going to use default.", e);
+ }
+ }
+ }
+
+ return ctxBuilder;
+ }
+
+ private void ensureKeyspaceExists(Cluster cl) throws BackendException {
+ KeyspaceDefinition ksDef;
+
+ try {
+ ksDef = cl.describeKeyspace(keySpaceName);
+
+ if (null != ksDef && ksDef.getName().equals(keySpaceName)) {
+ log.debug("Found keyspace {}", keySpaceName);
+ return;
+ }
+ } catch (ConnectionException e) {
+ log.debug("Failed to describe keyspace {}", keySpaceName);
+ }
+
+ log.debug("Creating keyspace {}...", keySpaceName);
+ try {
+ ksDef = cl.makeKeyspaceDefinition()
+ .setName(keySpaceName)
+ .setStrategyClass(storageConfig.get(REPLICATION_STRATEGY))
+ .setStrategyOptions(strategyOptions);
+ cl.addKeyspace(ksDef);
+
+ log.debug("Created keyspace {}", keySpaceName);
+ } catch (ConnectionException e) {
+ log.debug("Failed to create keyspace {}", keySpaceName);
+ throw new TemporaryBackendException(e);
+ }
+ }
+
+ private static RetryBackoffStrategy getRetryBackoffStrategy(String desc) throws PermanentBackendException {
+ if (null == desc)
+ return null;
+
+ String[] tokens = desc.split(",");
+ String policyClassName = tokens[0];
+ int argCount = tokens.length - 1;
+ Integer[] args = new Integer[argCount];
+
+ for (int i = 1; i < tokens.length; i++) {
+ args[i - 1] = Integer.valueOf(tokens[i]);
+ }
+
+ try {
+ RetryBackoffStrategy rbs = instantiate(policyClassName, args, desc);
+ log.debug("Instantiated RetryBackoffStrategy object {} from config string \"{}\"", rbs, desc);
+ return rbs;
+ } catch (Exception e) {
+ throw new PermanentBackendException("Failed to instantiate Astyanax RetryBackoffStrategy implementation", e);
+ }
+ }
+
+ private static RetryPolicy getRetryPolicy(String serializedRetryPolicy) throws BackendException {
+ String[] tokens = serializedRetryPolicy.split(",");
+ String policyClassName = tokens[0];
+ int argCount = tokens.length - 1;
+ Integer[] args = new Integer[argCount];
+ for (int i = 1; i < tokens.length; i++) {
+ args[i - 1] = Integer.valueOf(tokens[i]);
+ }
+
+ try {
+ RetryPolicy rp = instantiate(policyClassName, args, serializedRetryPolicy);
+ log.debug("Instantiated RetryPolicy object {} from config string \"{}\"", rp, serializedRetryPolicy);
+ return rp;
+ } catch (Exception e) {
+ throw new PermanentBackendException("Failed to instantiate Astyanax Retry Policy class", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <V> V instantiate(String policyClassName, Integer[] args, String raw) throws Exception {
+ for (Constructor<?> con : Class.forName(policyClassName).getConstructors()) {
+ Class<?>[] parameterTypes = con.getParameterTypes();
+
+ // match constructor by number of arguments first
+ if (args.length != parameterTypes.length)
+ continue;
+
+ // check if the constructor parameter types are compatible with argument types (which are integer)
+ // note that we allow long.class arguments too because integer is cast to long by runtime.
+ boolean intsOrLongs = true;
+ for (Class<?> pc : parameterTypes) {
+ if (!pc.equals(int.class) && !pc.equals(long.class)) {
+ intsOrLongs = false;
+ break;
+ }
+ }
+
+ // we found a constructor with required number of parameters but times didn't match, let's carry on
+ if (!intsOrLongs)
+ continue;
+
+ if (log.isDebugEnabled())
+ log.debug("About to instantiate class {} with {} arguments", con.toString(), args.length);
+
+ return (V) con.newInstance(args);
+ }
+
+ throw new Exception("Failed to identify a class matching the Astyanax Retry Policy config string \"" + raw + "\"");
+ }
+
+ @Override
+ public Map<String, String> getCompressionOptions(String cf) throws BackendException {
+ try {
+ Keyspace k = keyspaceContext.getClient();
+
+ KeyspaceDefinition kdef = k.describeKeyspace();
+
+ if (null == kdef) {
+ throw new PermanentBackendException("Keyspace " + k.getKeyspaceName() + " is undefined");
+ }
+
+ ColumnFamilyDefinition cfdef = kdef.getColumnFamily(cf);
+
+ if (null == cfdef) {
+ throw new PermanentBackendException("Column family " + cf + " is undefined");
+ }
+
+ return cfdef.getCompressionOptions();
+ } catch (ConnectionException e) {
+ throw new PermanentBackendException(e);
+ }
+ }
+}
+
+