summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java')
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java621
1 files changed, 621 insertions, 0 deletions
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java
new file mode 100644
index 0000000..d904860
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java
@@ -0,0 +1,621 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.thinkaurelius.titan.diskstorage.EntryMetaData;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
+import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
+import com.thinkaurelius.titan.util.system.NetworkUtil;
+
+import org.apache.cassandra.dht.AbstractByteOrderedPartitioner;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionPool;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+
+import static com.thinkaurelius.titan.diskstorage.configuration.ConfigOption.disallowEmpty;
+
+/**
+ * This class creates {@see CassandraThriftKeyColumnValueStore}s and
+ * handles Cassandra-backed allocation of vertex IDs for Titan (when so
+ * configured).
+ *
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ */
+@PreInitializeConfigOptions
+public class CassandraThriftStoreManager extends AbstractCassandraStoreManager {
+
+ public enum PoolExhaustedAction {
+ BLOCK(GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK),
+ FAIL(GenericKeyedObjectPool.WHEN_EXHAUSTED_FAIL),
+ GROW(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW);
+
+ private final byte b;
+
+ PoolExhaustedAction(byte b) {
+ this.b = b;
+ }
+
+ public byte getByte() {
+ return b;
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraThriftStoreManager.class);
+
+ public static final ConfigNamespace THRIFT_NS =
+ new ConfigNamespace(AbstractCassandraStoreManager.CASSANDRA_NS, "thrift",
+ "Options for Titan's own Thrift Cassandra backend");
+
+ public static final ConfigNamespace CPOOL_NS =
+ new ConfigNamespace(THRIFT_NS, "cpool", "Options for the Apache commons-pool connection manager");
+
+ public static final ConfigOption<String> CPOOL_WHEN_EXHAUSTED =
+ new ConfigOption<>(CPOOL_NS, "when-exhausted",
+ "What to do when clients concurrently request more active connections than are allowed " +
+ "by the pool. The value must be one of BLOCK, FAIL, or GROW.",
+ ConfigOption.Type.MASKABLE, String.class, PoolExhaustedAction.BLOCK.toString(),
+ disallowEmpty(String.class));
+
+ public static final ConfigOption<Integer> CPOOL_MAX_TOTAL =
+ new ConfigOption<Integer>(CPOOL_NS, "max-total",
+ "Max number of allowed Thrift connections, idle or active (-1 to leave undefined)",
+ ConfigOption.Type.MASKABLE, -1);
+
+ public static final ConfigOption<Integer> CPOOL_MAX_ACTIVE =
+ new ConfigOption<Integer>(CPOOL_NS, "max-active",
+ "Maximum number of concurrently in-use connections (-1 to leave undefined)",
+ ConfigOption.Type.MASKABLE, 16);
+
+ public static final ConfigOption<Integer> CPOOL_MAX_IDLE =
+ new ConfigOption<Integer>(CPOOL_NS, "max-idle",
+ "Maximum number of concurrently idle connections (-1 to leave undefined)",
+ ConfigOption.Type.MASKABLE, 4);
+
+ public static final ConfigOption<Integer> CPOOL_MIN_IDLE =
+ new ConfigOption<Integer>(CPOOL_NS, "min-idle",
+ "Minimum number of idle connections the pool attempts to maintain",
+ ConfigOption.Type.MASKABLE, 0);
+
+ // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
+ public static final ConfigOption<Long> CPOOL_MAX_WAIT =
+ new ConfigOption<Long>(CPOOL_NS, "max-wait",
+ "Maximum number of milliseconds to block when " + ConfigElement.getPath(CPOOL_WHEN_EXHAUSTED) +
+ " is set to BLOCK. Has no effect when set to actions besides BLOCK. Set to -1 to wait indefinitely.",
+ ConfigOption.Type.MASKABLE, -1L);
+
+ // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
+ public static final ConfigOption<Long> CPOOL_EVICTOR_PERIOD =
+ new ConfigOption<Long>(CPOOL_NS, "evictor-period",
+ "Approximate number of milliseconds between runs of the idle connection evictor. " +
+ "Set to -1 to never run the idle connection evictor.",
+ ConfigOption.Type.MASKABLE, 30L * 1000L);
+
+ // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
+ public static final ConfigOption<Long> CPOOL_MIN_EVICTABLE_IDLE_TIME =
+ new ConfigOption<Long>(CPOOL_NS, "min-evictable-idle-time",
+ "Minimum number of milliseconds a connection must be idle before it is eligible for " +
+ "eviction. See also " + ConfigElement.getPath(CPOOL_EVICTOR_PERIOD) + ". Set to -1 to never evict " +
+ "idle connections.", ConfigOption.Type.MASKABLE, 60L * 1000L);
+
+ public static final ConfigOption<Boolean> CPOOL_IDLE_TESTS =
+ new ConfigOption<Boolean>(CPOOL_NS, "idle-test",
+ "Whether the idle connection evictor validates idle connections and drops those that fail to validate",
+ ConfigOption.Type.MASKABLE, false);
+
+ public static final ConfigOption<Integer> CPOOL_IDLE_TESTS_PER_EVICTION_RUN =
+ new ConfigOption<Integer>(CPOOL_NS, "idle-tests-per-eviction-run",
+ "When the value is negative, e.g. -n, roughly one nth of the idle connections are tested per run. " +
+ "When the value is positive, e.g. n, the min(idle-count, n) connections are tested per run.",
+ ConfigOption.Type.MASKABLE, 0);
+
+
+ private final Map<String, CassandraThriftKeyColumnValueStore> openStores;
+ private final CTConnectionPool pool;
+ private final Deployment deployment;
+
+ public CassandraThriftStoreManager(Configuration config) throws BackendException {
+ super(config);
+
+ /*
+ * This is eventually passed to Thrift's TSocket constructor. The
+ * constructor parameter is of type int.
+ */
+ int thriftTimeoutMS = (int)config.get(GraphDatabaseConfiguration.CONNECTION_TIMEOUT).toMillis();
+
+ CTConnectionFactory.Config factoryConfig = new CTConnectionFactory.Config(hostnames, port, username, password)
+ .setTimeoutMS(thriftTimeoutMS)
+ .setFrameSize(thriftFrameSizeBytes);
+
+ if (config.get(SSL_ENABLED)) {
+ factoryConfig.setSSLTruststoreLocation(config.get(SSL_TRUSTSTORE_LOCATION));
+ factoryConfig.setSSLTruststorePassword(config.get(SSL_TRUSTSTORE_PASSWORD));
+ }
+
+ final PoolExhaustedAction poolExhaustedAction = ConfigOption.getEnumValue(
+ config.get(CPOOL_WHEN_EXHAUSTED), PoolExhaustedAction.class);
+
+ CTConnectionPool p = new CTConnectionPool(factoryConfig.build());
+ p.setTestOnBorrow(true);
+ p.setTestOnReturn(true);
+ p.setTestWhileIdle(config.get(CPOOL_IDLE_TESTS));
+ p.setNumTestsPerEvictionRun(config.get(CPOOL_IDLE_TESTS_PER_EVICTION_RUN));
+ p.setWhenExhaustedAction(poolExhaustedAction.getByte());
+ p.setMaxActive(config.get(CPOOL_MAX_ACTIVE));
+ p.setMaxTotal(config.get(CPOOL_MAX_TOTAL)); // maxTotal limits active + idle
+ p.setMaxIdle(config.get(CPOOL_MAX_IDLE));
+ p.setMinIdle(config.get(CPOOL_MIN_IDLE));
+ p.setMaxWait(config.get(CPOOL_MAX_WAIT));
+ p.setTimeBetweenEvictionRunsMillis(config.get(CPOOL_EVICTOR_PERIOD));
+ p.setMinEvictableIdleTimeMillis(config.get(CPOOL_MIN_EVICTABLE_IDLE_TIME));
+ this.pool = p;
+
+ this.openStores = new HashMap<String, CassandraThriftKeyColumnValueStore>();
+
+ // Only watch the ring and change endpoints with BOP
+ if (getCassandraPartitioner() instanceof ByteOrderedPartitioner) {
+ deployment = (hostnames.length == 1)// mark deployment as local only in case we have byte ordered partitioner and local connection
+ ? (NetworkUtil.isLocalConnection(hostnames[0])) ? Deployment.LOCAL : Deployment.REMOTE
+ : Deployment.REMOTE;
+ } else {
+ deployment = Deployment.REMOTE;
+ }
+ }
+
+ @Override
+ public Deployment getDeployment() {
+ return deployment;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public IPartitioner getCassandraPartitioner() throws BackendException {
+ CTConnection conn = null;
+ try {
+ conn = pool.borrowObject(SYSTEM_KS);
+ return FBUtilities.newPartitioner(conn.getClient().describe_partitioner());
+ } catch (Exception e) {
+ throw new TemporaryBackendException(e);
+ } finally {
+ pool.returnObjectUnsafe(SYSTEM_KS, conn);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "thriftCassandra" + super.toString();
+ }
+
+ @Override
+ public void close() throws BackendException {
+ openStores.clear();
+ closePool();
+ }
+
+ @Override
+ public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
+ Preconditions.checkNotNull(mutations);
+
+ final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
+
+ ConsistencyLevel consistency = getTx(txh).getWriteConsistencyLevel().getThrift();
+
+ // Generate Thrift-compatible batch_mutate() datastructure
+ // key -> cf -> cassmutation
+ int size = 0;
+ for (Map<StaticBuffer, KCVMutation> mutation : mutations.values()) size += mutation.size();
+ Map<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch =
+ new HashMap<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>>(size);
+
+
+ for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> keyMutation : mutations.entrySet()) {
+ String columnFamily = keyMutation.getKey();
+ for (Map.Entry<StaticBuffer, KCVMutation> mutEntry : keyMutation.getValue().entrySet()) {
+ ByteBuffer keyBB = mutEntry.getKey().asByteBuffer();
+
+ // Get or create the single Cassandra Mutation object responsible for this key
+ Map<String, List<org.apache.cassandra.thrift.Mutation>> cfmutation = batch.get(keyBB);
+ if (cfmutation == null) {
+ cfmutation = new HashMap<String, List<org.apache.cassandra.thrift.Mutation>>(3); // Most mutations only modify the edgeStore and indexStore
+ batch.put(keyBB, cfmutation);
+ }
+
+ KCVMutation mutation = mutEntry.getValue();
+ List<org.apache.cassandra.thrift.Mutation> thriftMutation =
+ new ArrayList<org.apache.cassandra.thrift.Mutation>(mutations.size());
+
+ if (mutation.hasDeletions()) {
+ for (StaticBuffer buf : mutation.getDeletions()) {
+ Deletion d = new Deletion();
+ SlicePredicate sp = new SlicePredicate();
+ sp.addToColumn_names(buf.as(StaticBuffer.BB_FACTORY));
+ d.setPredicate(sp);
+ d.setTimestamp(commitTime.getDeletionTime(times));
+ org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation();
+ m.setDeletion(d);
+ thriftMutation.add(m);
+ }
+ }
+
+ if (mutation.hasAdditions()) {
+ for (Entry ent : mutation.getAdditions()) {
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ Column column = new Column(ent.getColumnAs(StaticBuffer.BB_FACTORY));
+ column.setValue(ent.getValueAs(StaticBuffer.BB_FACTORY));
+
+ column.setTimestamp(commitTime.getAdditionTime(times));
+
+ Integer ttl = (Integer) ent.getMetaData().get(EntryMetaData.TTL);
+ if (null != ttl && ttl > 0) {
+ column.setTtl(ttl);
+ }
+
+ cosc.setColumn(column);
+ org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation();
+ m.setColumn_or_supercolumn(cosc);
+ thriftMutation.add(m);
+ }
+ }
+
+ cfmutation.put(columnFamily, thriftMutation);
+ }
+ }
+
+ CTConnection conn = null;
+ try {
+ conn = pool.borrowObject(keySpaceName);
+ Cassandra.Client client = conn.getClient();
+ if (atomicBatch) {
+ client.atomic_batch_mutate(batch, consistency);
+ } else {
+ client.batch_mutate(batch, consistency);
+ }
+ } catch (Exception ex) {
+ throw CassandraThriftKeyColumnValueStore.convertException(ex);
+ } finally {
+ pool.returnObjectUnsafe(keySpaceName, conn);
+ }
+
+ sleepAfterWrite(txh, commitTime);
+ }
+
+ @Override // TODO: *BIG FAT WARNING* 'synchronized is always *bad*, change openStores to use ConcurrentLinkedHashMap
+ public synchronized CassandraThriftKeyColumnValueStore openDatabase(final String name, StoreMetaData.Container metaData) throws BackendException {
+ if (openStores.containsKey(name))
+ return openStores.get(name);
+
+ ensureColumnFamilyExists(keySpaceName, name);
+
+ CassandraThriftKeyColumnValueStore store = new CassandraThriftKeyColumnValueStore(keySpaceName, name, this, pool);
+ openStores.put(name, store);
+ return store;
+ }
+
+ @Override
+ public List<KeyRange> getLocalKeyPartition() throws BackendException {
+ CTConnection conn = null;
+ IPartitioner partitioner = getCassandraPartitioner();
+
+ if (!(partitioner instanceof AbstractByteOrderedPartitioner))
+ throw new UnsupportedOperationException("getLocalKeyPartition() only supported by byte ordered partitioner.");
+
+ Token.TokenFactory tokenFactory = partitioner.getTokenFactory();
+
+ try {
+ // Resist the temptation to describe SYSTEM_KS. It has no ring.
+ // Instead, we'll create our own keyspace (or check that it exists), then describe it.
+ ensureKeyspaceExists(keySpaceName);
+
+ conn = pool.borrowObject(keySpaceName);
+ List<TokenRange> ranges = conn.getClient().describe_ring(keySpaceName);
+ List<KeyRange> keyRanges = new ArrayList<KeyRange>(ranges.size());
+
+ for (TokenRange range : ranges) {
+ if (!NetworkUtil.hasLocalAddress(range.endpoints))
+ continue;
+
+ keyRanges.add(CassandraHelper.transformRange(tokenFactory.fromString(range.start_token), tokenFactory.fromString(range.end_token)));
+ }
+
+ return keyRanges;
+ } catch (Exception e) {
+ throw CassandraThriftKeyColumnValueStore.convertException(e);
+ } finally {
+ pool.returnObjectUnsafe(keySpaceName, conn);
+ }
+ }
+
+ /**
+ * Connect to Cassandra via Thrift on the specified host and port and attempt to truncate the named keyspace.
+ * <p/>
+ * This is a utility method intended mainly for testing. It is
+ * equivalent to issuing 'truncate <cf>' for each of the column families in keyspace using
+ * the cassandra-cli tool.
+ * <p/>
+ * Using truncate is better for a number of reasons, most significantly because it doesn't
+ * involve any schema modifications which can take time to propagate across the cluster such
+ * leaves nodes in the inconsistent state and could result in read/write failures.
+ * Any schema modifications are discouraged until there is no traffic to Keyspace or ColumnFamilies.
+ *
+ * @throws com.thinkaurelius.titan.diskstorage.BackendException if any checked Thrift or UnknownHostException is thrown in the body of this method
+ */
+ public void clearStorage() throws BackendException {
+ openStores.clear();
+ final String lp = "ClearStorage: "; // "log prefix"
+ /*
+ * log4j is capable of automatically writing the name of a method that
+ * generated a log message, but the docs warn that "generating caller
+ * location information is extremely slow and should be avoided unless
+ * execution speed is not an issue."
+ */
+
+ CTConnection conn = null;
+ try {
+ conn = pool.borrowObject(SYSTEM_KS);
+ Cassandra.Client client = conn.getClient();
+
+ KsDef ksDef;
+ try {
+ client.set_keyspace(keySpaceName);
+ ksDef = client.describe_keyspace(keySpaceName);
+ } catch (NotFoundException e) {
+ log.debug(lp + "Keyspace {} does not exist, not attempting to truncate.", keySpaceName);
+ return;
+ } catch (InvalidRequestException e) {
+ log.debug(lp + "InvalidRequestException when attempting to describe keyspace {}, not attempting to truncate.", keySpaceName);
+ return;
+ }
+
+
+ if (null == ksDef) {
+ log.debug(lp + "Received null KsDef for keyspace {}; not truncating its CFs", keySpaceName);
+ return;
+ }
+
+ List<CfDef> cfDefs = ksDef.getCf_defs();
+
+ if (null == cfDefs) {
+ log.debug(lp + "Received empty CfDef list for keyspace {}; not truncating CFs", keySpaceName);
+ return;
+ }
+
+ for (CfDef cfDef : ksDef.getCf_defs()) {
+ client.truncate(cfDef.name);
+ log.info(lp + "Truncated CF {} in keyspace {}", cfDef.name, keySpaceName);
+ }
+
+ /*
+ * Clearing the CTConnectionPool is unnecessary. This method
+ * removes no keyspaces. All open Cassandra connections will
+ * remain valid.
+ */
+ } catch (Exception e) {
+ throw new TemporaryBackendException(e);
+ } finally {
+ if (conn != null && conn.getClient() != null) {
+ try {
+ conn.getClient().set_keyspace(SYSTEM_KS);
+ } catch (InvalidRequestException e) {
+ log.warn("Failed to reset keyspace", e);
+ } catch (TException e) {
+ log.warn("Failed to reset keyspace", e);
+ }
+ }
+ pool.returnObjectUnsafe(SYSTEM_KS, conn);
+ }
+ }
+
+ private KsDef ensureKeyspaceExists(String keyspaceName) throws TException, BackendException {
+ CTConnection connection = null;
+
+ try {
+ connection = pool.borrowObject(SYSTEM_KS);
+ Cassandra.Client client = connection.getClient();
+
+ try {
+ // Side effect: throws Exception if keyspaceName doesn't exist
+ client.set_keyspace(keyspaceName); // Don't remove
+ client.set_keyspace(SYSTEM_KS);
+ log.debug("Found existing keyspace {}", keyspaceName);
+ } catch (InvalidRequestException e) {
+ // Keyspace didn't exist; create it
+ log.debug("Creating keyspace {}...", keyspaceName);
+
+ KsDef ksdef = new KsDef().setName(keyspaceName)
+ .setCf_defs(new LinkedList<CfDef>()) // cannot be null but can be empty
+ .setStrategy_class(storageConfig.get(REPLICATION_STRATEGY))
+ .setStrategy_options(strategyOptions);
+
+ client.set_keyspace(SYSTEM_KS);
+ try {
+ client.system_add_keyspace(ksdef);
+ retrySetKeyspace(keyspaceName, client);
+ log.debug("Created keyspace {}", keyspaceName);
+ } catch (InvalidRequestException ire) {
+ log.error("system_add_keyspace failed for keyspace=" + keyspaceName, ire);
+ throw ire;
+ }
+
+ }
+
+ return client.describe_keyspace(keyspaceName);
+ } catch (Exception e) {
+ throw new TemporaryBackendException(e);
+ } finally {
+ pool.returnObjectUnsafe(SYSTEM_KS, connection);
+ }
+ }
+
+ private void retrySetKeyspace(String ksName, Cassandra.Client client) throws BackendException {
+ final long end = System.currentTimeMillis() + (60L * 1000L);
+
+ while (System.currentTimeMillis() <= end) {
+ try {
+ client.set_keyspace(ksName);
+ return;
+ } catch (Exception e) {
+ log.warn("Exception when changing to keyspace {} after creating it", ksName, e);
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException ie) {
+ throw new PermanentBackendException("Unexpected interrupt (shutting down?)", ie);
+ }
+ }
+ }
+
+ throw new PermanentBackendException("Could change to keyspace " + ksName + " after creating it");
+ }
+
+ private void ensureColumnFamilyExists(String ksName, String cfName) throws BackendException {
+ ensureColumnFamilyExists(ksName, cfName, "org.apache.cassandra.db.marshal.BytesType");
+ }
+
+ private void ensureColumnFamilyExists(String ksName, String cfName, String comparator) throws BackendException {
+ CTConnection conn = null;
+ try {
+ KsDef keyspaceDef = ensureKeyspaceExists(ksName);
+
+ conn = pool.borrowObject(ksName);
+ Cassandra.Client client = conn.getClient();
+
+ log.debug("Looking up metadata on keyspace {}...", ksName);
+
+ boolean foundColumnFamily = false;
+ for (CfDef cfDef : keyspaceDef.getCf_defs()) {
+ String curCfName = cfDef.getName();
+ if (curCfName.equals(cfName))
+ foundColumnFamily = true;
+ }
+
+ if (!foundColumnFamily) {
+ createColumnFamily(client, ksName, cfName, comparator);
+ } else {
+ log.debug("Keyspace {} and ColumnFamily {} were found.", ksName, cfName);
+ }
+ } catch (SchemaDisagreementException e) {
+ throw new TemporaryBackendException(e);
+ } catch (Exception e) {
+ throw new PermanentBackendException(e);
+ } finally {
+ pool.returnObjectUnsafe(ksName, conn);
+ }
+ }
+
+ private void createColumnFamily(Cassandra.Client client,
+ String ksName,
+ String cfName,
+ String comparator) throws BackendException {
+
+ CfDef createColumnFamily = new CfDef();
+ createColumnFamily.setName(cfName);
+ createColumnFamily.setKeyspace(ksName);
+ createColumnFamily.setComparator_type(comparator);
+
+ ImmutableMap.Builder<String, String> compressionOptions = new ImmutableMap.Builder<String, String>();
+
+ if (compressionEnabled) {
+ compressionOptions.put("sstable_compression", compressionClass)
+ .put("chunk_length_kb", Integer.toString(compressionChunkSizeKB));
+ }
+
+ createColumnFamily.setCompression_options(compressionOptions.build());
+
+ // Hard-coded caching settings
+ if (cfName.startsWith(Backend.EDGESTORE_NAME)) {
+ createColumnFamily.setCaching("keys_only");
+ } else if (cfName.startsWith(Backend.INDEXSTORE_NAME)) {
+ createColumnFamily.setCaching("rows_only");
+ }
+
+ log.debug("Adding column family {} to keyspace {}...", cfName, ksName);
+ try {
+ client.system_add_column_family(createColumnFamily);
+ } catch (SchemaDisagreementException e) {
+ throw new TemporaryBackendException("Error in setting up column family", e);
+ } catch (Exception e) {
+ throw new PermanentBackendException(e);
+ }
+
+ log.debug("Added column family {} to keyspace {}.", cfName, ksName);
+ }
+
+ @Override
+ public Map<String, String> getCompressionOptions(String cf) throws BackendException {
+ CTConnection conn = null;
+ Map<String, String> result = null;
+
+ try {
+ conn = pool.borrowObject(keySpaceName);
+ Cassandra.Client client = conn.getClient();
+
+ KsDef ksDef = client.describe_keyspace(keySpaceName);
+
+ for (CfDef cfDef : ksDef.getCf_defs()) {
+ if (null != cfDef && cfDef.getName().equals(cf)) {
+ result = cfDef.getCompression_options();
+ break;
+ }
+ }
+
+ return result;
+ } catch (InvalidRequestException e) {
+ log.debug("Keyspace {} does not exist", keySpaceName);
+ return null;
+ } catch (Exception e) {
+ throw new TemporaryBackendException(e);
+ } finally {
+ pool.returnObjectUnsafe(keySpaceName, conn);
+ }
+ }
+
+ private void closePool() {
+ /*
+ * pool.close() does not affect borrowed connections.
+ *
+ * Connections currently borrowed by some thread which are
+ * talking to the old host will eventually be destroyed by
+ * CTConnectionFactory#validateObject() returning false when
+ * those connections are returned to the pool.
+ */
+ try {
+ pool.close();
+ log.info("Closed Thrift connection pooler.");
+ } catch (Exception e) {
+ log.warn("Failed to close connection pooler. "
+ + "We might be leaking Cassandra connections.", e);
+ // There's still hope: CTConnectionFactory#validateObject()
+ // will be called on borrow() and might tear down the
+ // connections that close() failed to tear down
+ }
+ }
+}