summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMichael Lando <ml636r@att.com>2017-05-14 11:32:52 +0300
committerMichael Lando <ml636r@att.com>2017-05-14 11:46:35 +0300
commit6921632f70d9321b7e37c2a00618a21bccc0e765 (patch)
tree6789a7688e2fc9275c7ca63071ac2068c49b1746 /src
first commit for new repo
Change-Id: I307341ad768d38a60fb49f96b5b40186c0bd1205 Signed-off-by: Michael Lando <ml636r@att.com>
Diffstat (limited to 'src')
-rw-r--r--src/assembly/distribution.xml80
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java297
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevel.java59
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevelInterface.java10
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransaction.java53
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java358
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java672
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedKeyColumnValueStore.java527
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedStoreManager.java399
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java536
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java621
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnection.java55
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java210
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionPool.java64
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraDaemonWrapper.java87
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraHelper.java139
-rw-r--r--src/main/java/org/apache/cassandra/thrift/TBinaryProtocol.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/CassandraStorageSetup.java182
-rw-r--r--src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphComputerProvider.java22
-rw-r--r--src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphProvider.java18
-rw-r--r--src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftComputerTest.java25
-rw-r--r--src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftProcessTest.java25
-rw-r--r--src/test/java/com/thinkaurelius/titan/blueprints/thrift/structure/ThriftStructureTest.java24
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreTest.java136
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransactionTest.java79
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/UUIDTest.java34
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxIDAuthorityTest.java25
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLockStoreTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLogTest.java25
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxMultiWriteStoreTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxSSLStoreTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreTest.java28
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedIDAuthorityTest.java19
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLockStoreTest.java14
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLogTest.java24
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedMultiWriteStoreTest.java15
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedStoreTest.java42
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftDistributedStoreManagerTest.java30
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftIDAuthorityTest.java25
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLockStoreTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLogTest.java25
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftMultiWriteStoreTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftSSLStoreTest.java22
-rw-r--r--src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftStoreTest.java28
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/CassandraGraphTest.java95
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphConcurrentTest.java24
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphPerformanceMemoryTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphTest.java13
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxPartitionGraphTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedEventualGraphTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphConcurrentTest.java28
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphMemoryPerformanceTest.java25
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphTest.java13
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedPartitionGraphTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftEventualGraphTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphCacheTest.java28
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphConcurrentTest.java24
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphIterativeTest.java30
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphPerformanceMemoryTest.java22
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphSpeedTest.java56
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphTest.java13
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOLAPTest.java20
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOperationCountingTest.java25
-rw-r--r--src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftPartitionGraphTest.java21
-rw-r--r--src/test/java/com/thinkaurelius/titan/testcategory/CassandraSSLTests.java10
-rw-r--r--src/test/java/com/thinkaurelius/titan/testcategory/StandaloneTests.java9
-rw-r--r--src/test/resources/log4j.properties35
-rw-r--r--src/test/resources/rexster-fragment.xml13
68 files changed, 5769 insertions, 0 deletions
diff --git a/src/assembly/distribution.xml b/src/assembly/distribution.xml
new file mode 100644
index 0000000..72ad2f0
--- /dev/null
+++ b/src/assembly/distribution.xml
@@ -0,0 +1,80 @@
+<assembly>
+ <id>distribution</id>
+ <formats>
+ <format>zip</format>
+ </formats>
+ <fileSets>
+ <fileSet>
+ <fileMode>0775</fileMode>
+ <directory>../titan-all/src/main/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <includes>
+ <include>*.sh</include>
+ <include>*.bat</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src</directory>
+ </fileSet>
+ <fileSet>
+ <directory>../doc</directory>
+ </fileSet>
+ <fileSet>
+ <directory>../config</directory>
+ </fileSet>
+ <fileSet>
+ <directory>../target/site/apidocs</directory>
+ <outputDirectory>doc/javadoc</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>target/*.jar</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>../bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <includes>
+ <include>cassandra*</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+ <files>
+ <file>
+ <source>pom.xml</source>
+ <outputDirectory>src</outputDirectory>
+ </file>
+ <file>
+ <source>../LICENSE.txt</source>
+ <outputDirectory>/</outputDirectory>
+ </file>
+ <file>
+ <source>../CHANGELOG.textile</source>
+ <outputDirectory>/</outputDirectory>
+ </file>
+ <file>
+ <source>../UPGRADE.textile</source>
+ <outputDirectory>/</outputDirectory>
+ </file>
+ <file>
+ <source>../NOTICE.txt</source>
+ <outputDirectory>/</outputDirectory>
+ </file>
+ <file>
+ <source>../titan-all/src/main/bin/README.txt</source>
+ <outputDirectory>/</outputDirectory>
+ </file>
+ </files>
+
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/lib</outputDirectory>
+ <unpack>false</unpack>
+ <scope>compile</scope>
+ </dependencySet>
+ <dependencySet>
+ <outputDirectory>/lib</outputDirectory>
+ <unpack>false</unpack>
+ <scope>provided</scope>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java
new file mode 100644
index 0000000..1e12ada
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java
@@ -0,0 +1,297 @@
+package com.thinkaurelius.titan.diskstorage.cassandra;
+
+import java.util.*;
+
+import com.google.common.collect.ImmutableMap;
+import com.thinkaurelius.titan.core.TitanException;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
+import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
+
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.*;
+
+import org.apache.cassandra.dht.IPartitioner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+@PreInitializeConfigOptions
+public abstract class AbstractCassandraStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager {
+
+ public enum Partitioner {
+
+ RANDOM, BYTEORDER;
+
+ public static Partitioner getPartitioner(IPartitioner partitioner) {
+ return getPartitioner(partitioner.getClass().getSimpleName());
+ }
+
+ public static Partitioner getPartitioner(String className) {
+ if (className.endsWith("RandomPartitioner") || className.endsWith("Murmur3Partitioner"))
+ return Partitioner.RANDOM;
+ else if (className.endsWith("ByteOrderedPartitioner")) return Partitioner.BYTEORDER;
+ else throw new IllegalArgumentException("Unsupported partitioner: " + className);
+ }
+ }
+
+ //################### CASSANDRA SPECIFIC CONFIGURATION OPTIONS ######################
+
+ public static final ConfigNamespace CASSANDRA_NS =
+ new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "cassandra", "Cassandra storage backend options");
+
+ public static final ConfigOption<String> CASSANDRA_KEYSPACE =
+ new ConfigOption<String>(CASSANDRA_NS, "keyspace",
+ "The name of Titan's keyspace. It will be created if it does not exist.",
+ ConfigOption.Type.LOCAL, "titan");
+
+ // Consistency Levels and Atomic Batch
+ public static final ConfigOption<String> CASSANDRA_READ_CONSISTENCY =
+ new ConfigOption<String>(CASSANDRA_NS, "read-consistency-level",
+ "The consistency level of read operations against Cassandra",
+ ConfigOption.Type.MASKABLE, "LOCAL_QUORUM");
+
+ public static final ConfigOption<String> CASSANDRA_WRITE_CONSISTENCY =
+ new ConfigOption<String>(CASSANDRA_NS, "write-consistency-level",
+ "The consistency level of write operations against Cassandra",
+ ConfigOption.Type.MASKABLE, "LOCAL_QUORUM");
+
+ public static final ConfigOption<Boolean> ATOMIC_BATCH_MUTATE =
+ new ConfigOption<Boolean>(CASSANDRA_NS, "atomic-batch-mutate",
+ "True to use Cassandra atomic batch mutation, false to use non-atomic batches",
+ ConfigOption.Type.MASKABLE, true);
+
+ // Replication
+ public static final ConfigOption<Integer> REPLICATION_FACTOR =
+ new ConfigOption<Integer>(CASSANDRA_NS, "replication-factor",
+ "The number of data replicas (including the original copy) that should be kept. " +
+ "This is only meaningful for storage backends that natively support data replication.",
+ ConfigOption.Type.GLOBAL_OFFLINE, 1);
+
+ public static final ConfigOption<String> REPLICATION_STRATEGY =
+ new ConfigOption<String>(CASSANDRA_NS, "replication-strategy-class",
+ "The replication strategy to use for Titan keyspace",
+ ConfigOption.Type.FIXED, "org.apache.cassandra.locator.SimpleStrategy");
+
+ public static final ConfigOption<String[]> REPLICATION_OPTIONS =
+ new ConfigOption<String[]>(CASSANDRA_NS, "replication-strategy-options",
+ "Replication strategy options, e.g. factor or replicas per datacenter. This list is interpreted as a " +
+ "map. It must have an even number of elements in [key,val,key,val,...] form. A replication_factor set " +
+ "here takes precedence over one set with " + ConfigElement.getPath(REPLICATION_FACTOR),
+ ConfigOption.Type.FIXED, String[].class);
+
+ // Compression
+ public static final ConfigOption<Boolean> CF_COMPRESSION =
+ new ConfigOption<Boolean>(CASSANDRA_NS, "compression",
+ "Whether the storage backend should use compression when storing the data", ConfigOption.Type.FIXED, true);
+
+ public static final ConfigOption<String> CF_COMPRESSION_TYPE =
+ new ConfigOption<String>(CASSANDRA_NS, "compression-type",
+ "The sstable_compression value Titan uses when creating column families. " +
+ "This accepts any value allowed by Cassandra's sstable_compression option. " +
+ "Leave this unset to disable sstable_compression on Titan-created CFs.",
+ ConfigOption.Type.MASKABLE, "LZ4Compressor");
+
+ public static final ConfigOption<Integer> CF_COMPRESSION_BLOCK_SIZE =
+ new ConfigOption<Integer>(CASSANDRA_NS, "compression-block-size",
+ "The size of the compression blocks in kilobytes", ConfigOption.Type.FIXED, 64);
+
+ // SSL
+ public static final ConfigNamespace SSL_NS =
+ new ConfigNamespace(CASSANDRA_NS, "ssl", "Configuration options for SSL");
+
+ public static final ConfigNamespace SSL_TRUSTSTORE_NS =
+ new ConfigNamespace(SSL_NS, "truststore", "Configuration options for SSL Truststore.");
+
+ public static final ConfigOption<Boolean> SSL_ENABLED =
+ new ConfigOption<Boolean>(SSL_NS, "enabled",
+ "Controls use of the SSL connection to Cassandra", ConfigOption.Type.LOCAL, false);
+
+ public static final ConfigOption<String> SSL_TRUSTSTORE_LOCATION =
+ new ConfigOption<String>(SSL_TRUSTSTORE_NS, "location",
+ "Marks the location of the SSL Truststore.", ConfigOption.Type.LOCAL, "");
+
+ public static final ConfigOption<String> SSL_TRUSTSTORE_PASSWORD =
+ new ConfigOption<String>(SSL_TRUSTSTORE_NS, "password",
+ "The password to access SSL Truststore.", ConfigOption.Type.LOCAL, "");
+
+ // Thrift transport
+ public static final ConfigOption<Integer> THRIFT_FRAME_SIZE_MB =
+ new ConfigOption<>(CASSANDRA_NS, "frame-size-mb",
+ "The thrift frame size in megabytes", ConfigOption.Type.MASKABLE, 15);
+
+ /**
+ * The default Thrift port used by Cassandra. Set
+ * {@link GraphDatabaseConfiguration#STORAGE_PORT} to override.
+ * <p>
+ * Value = {@value}
+ */
+ public static final int PORT_DEFAULT = 9160;
+
+ public static final String SYSTEM_KS = "system";
+
+ protected final String keySpaceName;
+ protected final Map<String, String> strategyOptions;
+
+ protected final boolean compressionEnabled;
+ protected final int compressionChunkSizeKB;
+ protected final String compressionClass;
+
+ protected final boolean atomicBatch;
+
+ protected final int thriftFrameSizeBytes;
+
+ private volatile StoreFeatures features = null;
+ private Partitioner partitioner = null;
+
+ private static final Logger log =
+ LoggerFactory.getLogger(AbstractCassandraStoreManager.class);
+
+ public AbstractCassandraStoreManager(Configuration config) {
+ super(config, PORT_DEFAULT);
+
+ this.keySpaceName = config.get(CASSANDRA_KEYSPACE);
+ this.compressionEnabled = config.get(CF_COMPRESSION);
+ this.compressionChunkSizeKB = config.get(CF_COMPRESSION_BLOCK_SIZE);
+ this.compressionClass = config.get(CF_COMPRESSION_TYPE);
+ this.atomicBatch = config.get(ATOMIC_BATCH_MUTATE);
+ this.thriftFrameSizeBytes = config.get(THRIFT_FRAME_SIZE_MB) * 1024 * 1024;
+
+ // SSL truststore location sanity check
+ if (config.get(SSL_ENABLED) && config.get(SSL_TRUSTSTORE_LOCATION).isEmpty())
+ throw new IllegalArgumentException(SSL_TRUSTSTORE_LOCATION.getName() + " could not be empty when SSL is enabled.");
+
+ if (config.has(REPLICATION_OPTIONS)) {
+ String[] options = config.get(REPLICATION_OPTIONS);
+
+ if (options.length % 2 != 0)
+ throw new IllegalArgumentException(REPLICATION_OPTIONS.getName() + " should have even number of elements.");
+
+ Map<String, String> converted = new HashMap<String, String>(options.length / 2);
+
+ for (int i = 0; i < options.length; i += 2) {
+ converted.put(options[i], options[i + 1]);
+ }
+
+ this.strategyOptions = ImmutableMap.copyOf(converted);
+ } else {
+ this.strategyOptions = ImmutableMap.of("replication_factor", String.valueOf(config.get(REPLICATION_FACTOR)));
+ }
+ }
+
+ public final Partitioner getPartitioner() {
+ if (partitioner == null) {
+ try {
+ partitioner = Partitioner.getPartitioner(getCassandraPartitioner());
+ } catch (BackendException e) {
+ throw new TitanException("Could not connect to Cassandra to read partitioner information. Please check the connection", e);
+ }
+ }
+ assert partitioner != null;
+ return partitioner;
+ }
+
+ public abstract IPartitioner getCassandraPartitioner() throws BackendException;
+
+ @Override
+ public StoreTransaction beginTransaction(final BaseTransactionConfig config) {
+ return new CassandraTransaction(config);
+ }
+
+ @Override
+ public String toString() {
+ return "[" + keySpaceName + "@" + super.toString() + "]";
+ }
+
+ @Override
+ public StoreFeatures getFeatures() {
+
+ if (features == null) {
+
+ Configuration global = GraphDatabaseConfiguration.buildGraphConfiguration()
+ .set(CASSANDRA_READ_CONSISTENCY, "LOCAL_QUORUM")
+ .set(CASSANDRA_WRITE_CONSISTENCY, "LOCAL_QUORUM")
+ .set(METRICS_PREFIX, GraphDatabaseConfiguration.METRICS_SYSTEM_PREFIX_DEFAULT);
+
+ Configuration local = GraphDatabaseConfiguration.buildGraphConfiguration()
+ .set(CASSANDRA_READ_CONSISTENCY, "LOCAL_QUORUM")
+ .set(CASSANDRA_WRITE_CONSISTENCY, "LOCAL_QUORUM")
+ .set(METRICS_PREFIX, GraphDatabaseConfiguration.METRICS_SYSTEM_PREFIX_DEFAULT);
+
+ StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder();
+
+ fb.batchMutation(true).distributed(true);
+ fb.timestamps(true).cellTTL(true);
+ fb.keyConsistent(global, local);
+
+ boolean keyOrdered;
+
+ switch (getPartitioner()) {
+ case RANDOM:
+ keyOrdered = false;
+ fb.keyOrdered(keyOrdered).orderedScan(false).unorderedScan(true);
+ break;
+
+ case BYTEORDER:
+ keyOrdered = true;
+ fb.keyOrdered(keyOrdered).orderedScan(true).unorderedScan(false);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unrecognized partitioner: " + getPartitioner());
+ }
+
+ switch (getDeployment()) {
+ case REMOTE:
+ fb.multiQuery(true);
+ break;
+
+ case LOCAL:
+ fb.multiQuery(true).localKeyPartition(keyOrdered);
+ break;
+
+ case EMBEDDED:
+ fb.multiQuery(false).localKeyPartition(keyOrdered);
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unrecognized deployment mode: " + getDeployment());
+ }
+
+ features = fb.build();
+ }
+
+ return features;
+ }
+
+ /**
+ * Returns a map of compression options for the column family {@code cf}.
+ * The contents of the returned map must be identical to the contents of the
+ * map returned by
+ * {@link org.apache.cassandra.thrift.CfDef#getCompression_options()}, even
+ * for implementations of this method that don't use Thrift.
+ *
+ * @param cf the name of the column family for which to return compression
+ * options
+ * @return map of compression option names to compression option values
+ * @throws com.thinkaurelius.titan.diskstorage.BackendException if reading from Cassandra fails
+ */
+ public abstract Map<String, String> getCompressionOptions(String cf) throws BackendException;
+
+ public String getName() {
+ return getClass().getSimpleName() + keySpaceName;
+ }
+
+}
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevel.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevel.java
new file mode 100644
index 0000000..f3b1ca8
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevel.java
@@ -0,0 +1,59 @@
+package com.thinkaurelius.titan.diskstorage.cassandra;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This enum unites different libraries' consistency level enums, streamlining
+ * configuration and processing in {@link AbstractCassandraStoreManager}.
+ *
+ */
+public enum CLevel implements CLevelInterface { // One ring to rule them all
+ ANY,
+ ONE,
+ TWO,
+ THREE,
+ QUORUM,
+ ALL,
+ LOCAL_QUORUM,
+ EACH_QUORUM;
+
+ private final org.apache.cassandra.db.ConsistencyLevel db;
+ private final org.apache.cassandra.thrift.ConsistencyLevel thrift;
+ private final com.netflix.astyanax.model.ConsistencyLevel astyanax;
+
+ private CLevel() {
+ db = org.apache.cassandra.db.ConsistencyLevel.valueOf(toString());
+ thrift = org.apache.cassandra.thrift.ConsistencyLevel.valueOf(toString());
+ astyanax = com.netflix.astyanax.model.ConsistencyLevel.valueOf("CL_" + toString());
+ }
+
+ @Override
+ public org.apache.cassandra.db.ConsistencyLevel getDB() {
+ return db;
+ }
+
+ @Override
+ public org.apache.cassandra.thrift.ConsistencyLevel getThrift() {
+ return thrift;
+ }
+
+ @Override
+ public com.netflix.astyanax.model.ConsistencyLevel getAstyanax() {
+ return astyanax;
+ }
+
+ public static CLevel parse(String value) {
+ Preconditions.checkArgument(value != null && !value.isEmpty());
+ value = value.trim();
+ if (value.equals("1")) return ONE;
+ else if (value.equals("2")) return TWO;
+ else if (value.equals("3")) return THREE;
+ else {
+ for (CLevel c : values()) {
+ if (c.toString().equalsIgnoreCase(value) ||
+ ("CL_" + c.toString()).equalsIgnoreCase(value)) return c;
+ }
+ }
+ throw new IllegalArgumentException("Unrecognized cassandra consistency level: " + value);
+ }
+}
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevelInterface.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevelInterface.java
new file mode 100644
index 0000000..76d7ede
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevelInterface.java
@@ -0,0 +1,10 @@
+package com.thinkaurelius.titan.diskstorage.cassandra;
+
+public interface CLevelInterface {
+
+ public org.apache.cassandra.db.ConsistencyLevel getDB();
+
+ public org.apache.cassandra.thrift.ConsistencyLevel getThrift();
+
+ public com.netflix.astyanax.model.ConsistencyLevel getAstyanax();
+}
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransaction.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransaction.java
new file mode 100644
index 0000000..e94c08f
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransaction.java
@@ -0,0 +1,53 @@
+package com.thinkaurelius.titan.diskstorage.cassandra;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.CASSANDRA_READ_CONSISTENCY;
+import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.CASSANDRA_WRITE_CONSISTENCY;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
+import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+
+public class CassandraTransaction extends AbstractStoreTransaction {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraTransaction.class);
+
+ private final CLevel read;
+ private final CLevel write;
+
+ public CassandraTransaction(BaseTransactionConfig c) {
+ super(c);
+ read = CLevel.parse(getConfiguration().getCustomOption(CASSANDRA_READ_CONSISTENCY));
+ write = CLevel.parse(getConfiguration().getCustomOption(CASSANDRA_WRITE_CONSISTENCY));
+ log.debug("Created {}", this.toString());
+ }
+
+ public CLevel getReadConsistencyLevel() {
+ return read;
+ }
+
+ public CLevel getWriteConsistencyLevel() {
+ return write;
+ }
+
+ public static CassandraTransaction getTx(StoreTransaction txh) {
+ Preconditions.checkArgument(txh != null);
+ Preconditions.checkArgument(txh instanceof CassandraTransaction, "Unexpected transaction type %s", txh.getClass().getName());
+ return (CassandraTransaction) txh;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder(64);
+ sb.append("CassandraTransaction@");
+ sb.append(Integer.toHexString(hashCode()));
+ sb.append("[read=");
+ sb.append(read);
+ sb.append(",write=");
+ sb.append(write);
+ sb.append("]");
+ return sb.toString();
+ }
+}
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java
new file mode 100644
index 0000000..0d3eb9b
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java
@@ -0,0 +1,358 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.*;
+import com.netflix.astyanax.ExceptionCallback;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.*;
+import com.netflix.astyanax.query.AllRowsQuery;
+import com.netflix.astyanax.query.RowSliceQuery;
+import com.netflix.astyanax.retry.RetryPolicy;
+import com.netflix.astyanax.serializers.ByteBufferSerializer;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
+import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.Partitioner;
+import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
+
+public class AstyanaxKeyColumnValueStore implements KeyColumnValueStore {
+
+ private final Keyspace keyspace;
+ private final String columnFamilyName;
+ private final ColumnFamily<ByteBuffer, ByteBuffer> columnFamily;
+ private final RetryPolicy retryPolicy;
+ private final AstyanaxStoreManager storeManager;
+ private final AstyanaxGetter entryGetter;
+
+ AstyanaxKeyColumnValueStore(String columnFamilyName,
+ Keyspace keyspace,
+ AstyanaxStoreManager storeManager,
+ RetryPolicy retryPolicy) {
+ this.keyspace = keyspace;
+ this.columnFamilyName = columnFamilyName;
+ this.retryPolicy = retryPolicy;
+ this.storeManager = storeManager;
+
+ entryGetter = new AstyanaxGetter(storeManager.getMetaDataSchema(columnFamilyName));
+
+ columnFamily = new ColumnFamily<ByteBuffer, ByteBuffer>(
+ this.columnFamilyName,
+ ByteBufferSerializer.get(),
+ ByteBufferSerializer.get());
+
+ }
+
+
+ ColumnFamily<ByteBuffer, ByteBuffer> getColumnFamily() {
+ return columnFamily;
+ }
+
+ @Override
+ public void close() throws BackendException {
+ //Do nothing
+ }
+
+ @Override
+ public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
+ Map<StaticBuffer, EntryList> result = getNamesSlice(query.getKey(), query, txh);
+ return Iterables.getOnlyElement(result.values(),EntryList.EMPTY_LIST);
+ }
+
+ @Override
+ public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
+ return getNamesSlice(keys, query, txh);
+ }
+
+ public Map<StaticBuffer, EntryList> getNamesSlice(StaticBuffer key,
+ SliceQuery query, StoreTransaction txh) throws BackendException {
+ return getNamesSlice(ImmutableList.of(key),query,txh);
+ }
+
+
+ public Map<StaticBuffer, EntryList> getNamesSlice(List<StaticBuffer> keys,
+ SliceQuery query, StoreTransaction txh) throws BackendException {
+ /*
+ * RowQuery<K,C> should be parameterized as
+ * RowQuery<ByteBuffer,ByteBuffer>. However, this causes the following
+ * compilation error when attempting to call withColumnRange on a
+ * RowQuery<ByteBuffer,ByteBuffer> instance:
+ *
+ * java.lang.Error: Unresolved compilation problem: The method
+ * withColumnRange(ByteBuffer, ByteBuffer, boolean, int) is ambiguous
+ * for the type RowQuery<ByteBuffer,ByteBuffer>
+ *
+ * The compiler substitutes ByteBuffer=C for both startColumn and
+ * endColumn, compares it to its identical twin with that type
+ * hard-coded, and dies.
+ *
+ */
+ RowSliceQuery rq = keyspace.prepareQuery(columnFamily)
+ .setConsistencyLevel(getTx(txh).getReadConsistencyLevel().getAstyanax())
+ .withRetryPolicy(retryPolicy.duplicate())
+ .getKeySlice(CassandraHelper.convert(keys));
+
+ // Thank you, Astyanax, for making builder pattern useful :(
+ rq.withColumnRange(query.getSliceStart().asByteBuffer(),
+ query.getSliceEnd().asByteBuffer(),
+ false,
+ query.getLimit() + (query.hasLimit()?1:0)); //Add one for potentially removed last column
+
+ OperationResult<Rows<ByteBuffer, ByteBuffer>> r;
+ try {
+ r = (OperationResult<Rows<ByteBuffer, ByteBuffer>>) rq.execute();
+ } catch (ConnectionException e) {
+ throw new TemporaryBackendException(e);
+ }
+
+ Rows<ByteBuffer,ByteBuffer> rows = r.getResult();
+ Map<StaticBuffer, EntryList> result = new HashMap<StaticBuffer, EntryList>(rows.size());
+
+ for (Row<ByteBuffer, ByteBuffer> row : rows) {
+ assert !result.containsKey(row.getKey());
+ result.put(StaticArrayBuffer.of(row.getKey()),
+ CassandraHelper.makeEntryList(row.getColumns(),entryGetter, query.getSliceEnd(), query.getLimit()));
+ }
+
+ return result;
+ }
+
+ private static class AstyanaxGetter implements StaticArrayEntry.GetColVal<Column<ByteBuffer>,ByteBuffer> {
+
+ private final EntryMetaData[] schema;
+
+ private AstyanaxGetter(EntryMetaData[] schema) {
+ this.schema = schema;
+ }
+
+
+ @Override
+ public ByteBuffer getColumn(Column<ByteBuffer> element) {
+ return element.getName();
+ }
+
+ @Override
+ public ByteBuffer getValue(Column<ByteBuffer> element) {
+ return element.getByteBufferValue();
+ }
+
+ @Override
+ public EntryMetaData[] getMetaSchema(Column<ByteBuffer> element) {
+ return schema;
+ }
+
+ @Override
+ public Object getMetaData(Column<ByteBuffer> element, EntryMetaData meta) {
+ switch(meta) {
+ case TIMESTAMP:
+ return element.getTimestamp();
+ case TTL:
+ return element.getTtl();
+ default:
+ throw new UnsupportedOperationException("Unsupported meta data: " + meta);
+ }
+ }
+ }
+
+ @Override
+ public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
+ mutateMany(ImmutableMap.of(key, new KCVMutation(additions, deletions)), txh);
+ }
+
+ public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
+ storeManager.mutateMany(ImmutableMap.of(columnFamilyName, mutations), txh);
+ }
+
+ @Override
+ public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public KeyIterator getKeys(@Nullable SliceQuery sliceQuery, StoreTransaction txh) throws BackendException {
+ if (storeManager.getPartitioner() != Partitioner.RANDOM)
+ throw new PermanentBackendException("This operation is only allowed when random partitioner (md5 or murmur3) is used.");
+
+ AllRowsQuery allRowsQuery = keyspace.prepareQuery(columnFamily).getAllRows();
+
+ if (sliceQuery != null) {
+ allRowsQuery.withColumnRange(sliceQuery.getSliceStart().asByteBuffer(),
+ sliceQuery.getSliceEnd().asByteBuffer(),
+ false,
+ sliceQuery.getLimit());
+ }
+
+ Rows<ByteBuffer, ByteBuffer> result;
+ try {
+ /* Note: we need to fetch columns for each row as well to remove "range ghosts" */
+ OperationResult op = allRowsQuery.setRowLimit(storeManager.getPageSize()) // pre-fetch that many rows at a time
+ .setConcurrencyLevel(1) // one execution thread for fetching portion of rows
+ .setExceptionCallback(new ExceptionCallback() {
+ private int retries = 0;
+
+ @Override
+ public boolean onException(ConnectionException e) {
+ try {
+ return retries > 2; // make 3 re-tries
+ } finally {
+ retries++;
+ }
+ }
+ }).execute();
+
+ result = ((OperationResult<Rows<ByteBuffer, ByteBuffer>>) op).getResult();
+ } catch (ConnectionException e) {
+ throw new PermanentBackendException(e);
+ }
+
+ return new RowIterator(result.iterator(), sliceQuery);
+ }
+
+ @Override
+ public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException {
+ // this query could only be done when byte-ordering partitioner is used
+ // because Cassandra operates on tokens internally which means that even contiguous
+ // range of keys (e.g. time slice) with random partitioner could produce disjoint set of tokens
+ // returning ambiguous results to the user.
+ Partitioner partitioner = storeManager.getPartitioner();
+ if (partitioner != Partitioner.BYTEORDER)
+ throw new PermanentBackendException("getKeys(KeyRangeQuery could only be used with byte-ordering partitioner.");
+
+ ByteBuffer start = query.getKeyStart().asByteBuffer(), end = query.getKeyEnd().asByteBuffer();
+
+ RowSliceQuery rowSlice = keyspace.prepareQuery(columnFamily)
+ .setConsistencyLevel(getTx(txh).getReadConsistencyLevel().getAstyanax())
+ .withRetryPolicy(retryPolicy.duplicate())
+ .getKeyRange(start, end, null, null, Integer.MAX_VALUE);
+
+ // Astyanax is bad at builder pattern :(
+ rowSlice.withColumnRange(query.getSliceStart().asByteBuffer(),
+ query.getSliceEnd().asByteBuffer(),
+ false,
+ query.getLimit());
+
+ // Omit final the query's keyend from the result, if present in result
+ final Rows<ByteBuffer, ByteBuffer> r;
+ try {
+ r = ((OperationResult<Rows<ByteBuffer, ByteBuffer>>) rowSlice.execute()).getResult();
+ } catch (ConnectionException e) {
+ throw new TemporaryBackendException(e);
+ }
+ Iterator<Row<ByteBuffer, ByteBuffer>> i =
+ Iterators.filter(r.iterator(), new KeySkipPredicate(query.getKeyEnd().asByteBuffer()));
+ return new RowIterator(i, query);
+ }
+
+ @Override
+ public String getName() {
+ return columnFamilyName;
+ }
+
+ private static class KeyIterationPredicate implements Predicate<Row<ByteBuffer, ByteBuffer>> {
+ @Override
+ public boolean apply(@Nullable Row<ByteBuffer, ByteBuffer> row) {
+ return (row != null) && row.getColumns().size() > 0;
+ }
+ }
+
+ private static class KeySkipPredicate implements Predicate<Row<ByteBuffer, ByteBuffer>> {
+
+ private final ByteBuffer skip;
+
+ public KeySkipPredicate(ByteBuffer skip) {
+ this.skip = skip;
+ }
+
+ @Override
+ public boolean apply(@Nullable Row<ByteBuffer, ByteBuffer> row) {
+ return (row != null) && !row.getKey().equals(skip);
+ }
+ }
+
+ private class RowIterator implements KeyIterator {
+ private final Iterator<Row<ByteBuffer, ByteBuffer>> rows;
+ private Row<ByteBuffer, ByteBuffer> currentRow;
+ private final SliceQuery sliceQuery;
+ private boolean isClosed;
+
+ public RowIterator(Iterator<Row<ByteBuffer, ByteBuffer>> rowIter, SliceQuery sliceQuery) {
+ this.rows = Iterators.filter(rowIter, new KeyIterationPredicate());
+ this.sliceQuery = sliceQuery;
+ }
+
+ @Override
+ public RecordIterator<Entry> getEntries() {
+ ensureOpen();
+
+ if (sliceQuery == null)
+ throw new IllegalStateException("getEntries() requires SliceQuery to be set.");
+
+ return new RecordIterator<Entry>() {
+ private final Iterator<Entry> columns =
+ CassandraHelper.makeEntryIterator(currentRow.getColumns(),
+ entryGetter,
+ sliceQuery.getSliceEnd(),sliceQuery.getLimit());
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+ return columns.hasNext();
+ }
+
+ @Override
+ public Entry next() {
+ ensureOpen();
+ return columns.next();
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+ return rows.hasNext();
+ }
+
+ @Override
+ public StaticBuffer next() {
+ ensureOpen();
+
+ currentRow = rows.next();
+ return StaticArrayBuffer.of(currentRow.getKey());
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void ensureOpen() {
+ if (isClosed)
+ throw new IllegalStateException("Iterator has been closed.");
+ }
+ }
+}
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java
new file mode 100644
index 0000000..f73ec15
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java
@@ -0,0 +1,672 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import com.netflix.astyanax.AstyanaxContext;
+import com.netflix.astyanax.Cluster;
+import com.netflix.astyanax.ColumnListMutation;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.Host;
+import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
+import com.netflix.astyanax.connectionpool.RetryBackoffStrategy;
+import com.netflix.astyanax.connectionpool.SSLConnectionContext;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
+import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
+import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
+import com.netflix.astyanax.connectionpool.impl.ExponentialRetryBackoffStrategy;
+import com.netflix.astyanax.connectionpool.impl.SimpleAuthenticationCredentials;
+import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
+import com.netflix.astyanax.ddl.KeyspaceDefinition;
+import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.retry.RetryPolicy;
+import com.netflix.astyanax.thrift.ThriftFamilyFactory;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.Entry;
+import com.thinkaurelius.titan.diskstorage.EntryMetaData;
+import com.thinkaurelius.titan.diskstorage.PermanentBackendException;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.StoreMetaData;
+import com.thinkaurelius.titan.diskstorage.TemporaryBackendException;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
+
+@PreInitializeConfigOptions
+public class AstyanaxStoreManager extends AbstractCassandraStoreManager {
+
+ private static final Logger log = LoggerFactory.getLogger(AstyanaxStoreManager.class);
+
+ //################### ASTYANAX SPECIFIC CONFIGURATION OPTIONS ######################
+
+ public static final ConfigNamespace ASTYANAX_NS =
+ new ConfigNamespace(CASSANDRA_NS, "astyanax", "Astyanax-specific Cassandra options");
+
+ /**
+ * Default name for the Cassandra cluster
+ * <p/>
+ */
+ public static final ConfigOption<String> CLUSTER_NAME =
+ new ConfigOption<String>(ASTYANAX_NS, "cluster-name",
+ "Default name for the Cassandra cluster",
+ ConfigOption.Type.MASKABLE, "Titan Cluster");
+
+ /**
+ * Maximum pooled connections per host.
+ * <p/>
+ */
+ public static final ConfigOption<Integer> MAX_CONNECTIONS_PER_HOST =
+ new ConfigOption<Integer>(ASTYANAX_NS, "max-connections-per-host",
+ "Maximum pooled connections per host",
+ ConfigOption.Type.MASKABLE, 32);
+
+ /**
+ * Maximum open connections allowed in the pool (counting all hosts).
+ * <p/>
+ */
+ public static final ConfigOption<Integer> MAX_CONNECTIONS =
+ new ConfigOption<Integer>(ASTYANAX_NS, "max-connections",
+ "Maximum open connections allowed in the pool (counting all hosts)",
+ ConfigOption.Type.MASKABLE, -1);
+
+ /**
+ * Maximum number of operations allowed per connection before the connection is closed.
+ * <p/>
+ */
+ public static final ConfigOption<Integer> MAX_OPERATIONS_PER_CONNECTION =
+ new ConfigOption<Integer>(ASTYANAX_NS, "max-operations-per-connection",
+ "Maximum number of operations allowed per connection before the connection is closed",
+ ConfigOption.Type.MASKABLE, 100 * 1000);
+
+ /**
+ * Maximum pooled "cluster" connections per host.
+ * <p/>
+ * These connections are mostly idle and only used for DDL operations
+ * (like creating keyspaces). Titan doesn't need many of these connections
+ * in ordinary operation.
+ */
+ public static final ConfigOption<Integer> MAX_CLUSTER_CONNECTIONS_PER_HOST =
+ new ConfigOption<Integer>(ASTYANAX_NS, "max-cluster-connections-per-host",
+ "Maximum pooled \"cluster\" connections per host",
+ ConfigOption.Type.MASKABLE, 3);
+
+ /**
+ * How Astyanax discovers Cassandra cluster nodes. This must be one of the
+ * values of the Astyanax NodeDiscoveryType enum.
+ * <p/>
+ */
+ public static final ConfigOption<String> NODE_DISCOVERY_TYPE =
+ new ConfigOption<String>(ASTYANAX_NS, "node-discovery-type",
+ "How Astyanax discovers Cassandra cluster nodes",
+ ConfigOption.Type.MASKABLE, "RING_DESCRIBE");
+
+ /**
+ * Astyanax specific host supplier useful only when discovery type set to DISCOVERY_SERVICE or TOKEN_AWARE.
+ * Excepts fully qualified class name which extends google.common.base.Supplier<List<Host>>.
+ */
+ public static final ConfigOption<String> HOST_SUPPLIER =
+ new ConfigOption<String>(ASTYANAX_NS, "host-supplier",
+ "Host supplier to use when discovery type is set to DISCOVERY_SERVICE or TOKEN_AWARE",
+ ConfigOption.Type.MASKABLE, String.class);
+
+ /**
+ * Astyanax's connection pooler implementation. This must be one of the
+ * values of the Astyanax ConnectionPoolType enum.
+ * <p/>
+ */
+ public static final ConfigOption<String> CONNECTION_POOL_TYPE =
+ new ConfigOption<String>(ASTYANAX_NS, "connection-pool-type",
+ "Astyanax's connection pooler implementation",
+ ConfigOption.Type.MASKABLE, "TOKEN_AWARE");
+
+ /**
+ * In Astyanax, RetryPolicy and RetryBackoffStrategy sound and look similar
+ * but are used for distinct purposes. RetryPolicy is for retrying failed
+ * operations. RetryBackoffStrategy is for retrying attempts to talk to
+ * uncommunicative hosts. This config option controls RetryPolicy.
+ */
+ public static final ConfigOption<String> RETRY_POLICY =
+ new ConfigOption<String>(ASTYANAX_NS, "retry-policy",
+ "Astyanax's retry policy implementation with configuration parameters",
+ ConfigOption.Type.MASKABLE, "com.netflix.astyanax.retry.BoundedExponentialBackoff,100,25000,8");
+
+ /**
+ * If non-null, this must be the fully-qualified classname (i.e. the
+ * complete package name, a dot, and then the class name) of an
+ * implementation of Astyanax's RetryBackoffStrategy interface. This string
+ * may be followed by a sequence of integers, separated from the full
+ * classname and from each other by commas; in this case, the integers are
+ * cast to native Java ints and passed to the class constructor as
+ * arguments. Here's an example setting that would instantiate an Astyanax
+ * FixedRetryBackoffStrategy with an delay interval of 1s and suspend time
+ * of 5s:
+ * <p/>
+ * <code>
+ * com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy,1000,5000
+ * </code>
+ * <p/>
+ * If null, then Astyanax uses its default strategy, which is an
+ * ExponentialRetryBackoffStrategy instance. The instance parameters take
+ * Astyanax's built-in default values, which can be overridden via the
+ * following config keys:
+ * <ul>
+ * <li>{@link #RETRY_DELAY_SLICE}</li>
+ * <li>{@link #RETRY_MAX_DELAY_SLICE}</li>
+ * <li>{@link #RETRY_SUSPEND_WINDOW}</li>
+ * </ul>
+ * <p/>
+ * In Astyanax, RetryPolicy and RetryBackoffStrategy sound and look similar
+ * but are used for distinct purposes. RetryPolicy is for retrying failed
+ * operations. RetryBackoffStrategy is for retrying attempts to talk to
+ * uncommunicative hosts. This config option controls RetryBackoffStrategy.
+ */
+ public static final ConfigOption<String> RETRY_BACKOFF_STRATEGY =
+ new ConfigOption<String>(ASTYANAX_NS, "retry-backoff-strategy",
+ "Astyanax's retry backoff strategy with configuration parameters",
+ ConfigOption.Type.MASKABLE, "com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy,1000,5000");
+
+ /**
+ * Controls the retryDelaySlice parameter on Astyanax
+ * ConnectionPoolConfigurationImpl objects, which is in turn used by
+ * ExponentialRetryBackoffStrategy. See the code for
+ * {@link ConnectionPoolConfigurationImpl},
+ * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
+ * {@link #RETRY_BACKOFF_STRATEGY} for more information.
+ * <p/>
+ * This parameter is not meaningful for and has no effect on
+ * FixedRetryBackoffStrategy.
+ */
+ public static final ConfigOption<Integer> RETRY_DELAY_SLICE =
+ new ConfigOption<Integer>(ASTYANAX_NS, "retry-delay-slice",
+ "Astyanax's connection pool \"retryDelaySlice\" parameter",
+ ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_DELAY_SLICE);
+ /**
+ * Controls the retryMaxDelaySlice parameter on Astyanax
+ * ConnectionPoolConfigurationImpl objects, which is in turn used by
+ * ExponentialRetryBackoffStrategy. See the code for
+ * {@link ConnectionPoolConfigurationImpl},
+ * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
+ * {@link #RETRY_BACKOFF_STRATEGY} for more information.
+ * <p/>
+ * This parameter is not meaningful for and has no effect on
+ * FixedRetryBackoffStrategy.
+ */
+ public static final ConfigOption<Integer> RETRY_MAX_DELAY_SLICE =
+ new ConfigOption<Integer>(ASTYANAX_NS, "retry-max-delay-slice",
+ "Astyanax's connection pool \"retryMaxDelaySlice\" parameter",
+ ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_MAX_DELAY_SLICE);
+
+ /**
+ * Controls the retrySuspendWindow parameter on Astyanax
+ * ConnectionPoolConfigurationImpl objects, which is in turn used by
+ * ExponentialRetryBackoffStrategy. See the code for
+ * {@link ConnectionPoolConfigurationImpl},
+ * {@link ExponentialRetryBackoffStrategy}, and the javadoc for
+ * {@link #RETRY_BACKOFF_STRATEGY} for more information.
+ * <p/>
+ * This parameter is not meaningful for and has no effect on
+ * FixedRetryBackoffStrategy.
+ */
+ public static final ConfigOption<Integer> RETRY_SUSPEND_WINDOW =
+ new ConfigOption<Integer>(ASTYANAX_NS, "retry-suspend-window",
+ "Astyanax's connection pool \"retryMaxDelaySlice\" parameter",
+ ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_SUSPEND_WINDOW);
+
+ /**
+ * Controls the frame size of thrift sockets created by Astyanax.
+ */
+ public static final ConfigOption<Integer> THRIFT_FRAME_SIZE =
+ new ConfigOption<Integer>(ASTYANAX_NS, "frame-size",
+ "The thrift frame size in mega bytes", ConfigOption.Type.MASKABLE, 15);
+
+ public static final ConfigOption<String> LOCAL_DATACENTER =
+ new ConfigOption<String>(ASTYANAX_NS, "local-datacenter",
+ "The name of the local or closest Cassandra datacenter. When set and not whitespace, " +
+ "this value will be passed into ConnectionPoolConfigurationImpl.setLocalDatacenter. " +
+ "When unset or set to whitespace, setLocalDatacenter will not be invoked.",
+ /* It's between either LOCAL or MASKABLE. MASKABLE could be useful for cases where
+ all the Titan instances are closest to the same Cassandra DC. */
+ ConfigOption.Type.MASKABLE, String.class);
+
+ private final String clusterName;
+
+ private final AstyanaxContext<Keyspace> keyspaceContext;
+ private final AstyanaxContext<Cluster> clusterContext;
+
+ private final RetryPolicy retryPolicy;
+
+ private final int retryDelaySlice;
+ private final int retryMaxDelaySlice;
+ private final int retrySuspendWindow;
+ private final RetryBackoffStrategy retryBackoffStrategy;
+
+ private final String localDatacenter;
+
+ private final Map<String, AstyanaxKeyColumnValueStore> openStores;
+
+ public AstyanaxStoreManager(Configuration config) throws BackendException {
+ super(config);
+
+ this.clusterName = config.get(CLUSTER_NAME);
+
+ retryDelaySlice = config.get(RETRY_DELAY_SLICE);
+ retryMaxDelaySlice = config.get(RETRY_MAX_DELAY_SLICE);
+ retrySuspendWindow = config.get(RETRY_SUSPEND_WINDOW);
+ retryBackoffStrategy = getRetryBackoffStrategy(config.get(RETRY_BACKOFF_STRATEGY));
+ retryPolicy = getRetryPolicy(config.get(RETRY_POLICY));
+
+ localDatacenter = config.has(LOCAL_DATACENTER) ?
+ config.get(LOCAL_DATACENTER) : "";
+
+ final int maxConnsPerHost = config.get(MAX_CONNECTIONS_PER_HOST);
+
+ final int maxClusterConnsPerHost = config.get(MAX_CLUSTER_CONNECTIONS_PER_HOST);
+
+ this.clusterContext = createCluster(getContextBuilder(config, maxClusterConnsPerHost, "Cluster"));
+
+ ensureKeyspaceExists(clusterContext.getClient());
+
+ this.keyspaceContext = getContextBuilder(config, maxConnsPerHost, "Keyspace").buildKeyspace(ThriftFamilyFactory.getInstance());
+ this.keyspaceContext.start();
+
+ openStores = new HashMap<String, AstyanaxKeyColumnValueStore>(8);
+ }
+
+ @Override
+ public Deployment getDeployment() {
+ return Deployment.REMOTE; // TODO
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public IPartitioner getCassandraPartitioner() throws BackendException {
+ Cluster cl = clusterContext.getClient();
+ try {
+ return FBUtilities.newPartitioner(cl.describePartitioner());
+ } catch (ConnectionException e) {
+ throw new TemporaryBackendException(e);
+ } catch (ConfigurationException e) {
+ throw new PermanentBackendException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "astyanax" + super.toString();
+ }
+
+ @Override
+ public void close() {
+ // Shutdown the Astyanax contexts
+ openStores.clear();
+ keyspaceContext.shutdown();
+ clusterContext.shutdown();
+ }
+
+ @Override
+ public synchronized AstyanaxKeyColumnValueStore openDatabase(String name, StoreMetaData.Container metaData) throws BackendException {
+ if (openStores.containsKey(name)) return openStores.get(name);
+ else {
+ ensureColumnFamilyExists(name);
+ AstyanaxKeyColumnValueStore store = new AstyanaxKeyColumnValueStore(name, keyspaceContext.getClient(), this, retryPolicy);
+ openStores.put(name, store);
+ return store;
+ }
+ }
+
+ @Override
+ public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> batch, StoreTransaction txh) throws BackendException {
+ MutationBatch m = keyspaceContext.getClient().prepareMutationBatch().withAtomicBatch(atomicBatch)
+ .setConsistencyLevel(getTx(txh).getWriteConsistencyLevel().getAstyanax())
+ .withRetryPolicy(retryPolicy.duplicate());
+
+ final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
+
+ for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> batchentry : batch.entrySet()) {
+ String storeName = batchentry.getKey();
+ Preconditions.checkArgument(openStores.containsKey(storeName), "Store cannot be found: " + storeName);
+
+ ColumnFamily<ByteBuffer, ByteBuffer> columnFamily = openStores.get(storeName).getColumnFamily();
+
+ Map<StaticBuffer, KCVMutation> mutations = batchentry.getValue();
+ for (Map.Entry<StaticBuffer, KCVMutation> ent : mutations.entrySet()) {
+ // The CLMs for additions and deletions are separated because
+ // Astyanax's operation timestamp cannot be set on a per-delete
+ // or per-addition basis.
+ KCVMutation titanMutation = ent.getValue();
+ ByteBuffer key = ent.getKey().asByteBuffer();
+
+ if (titanMutation.hasDeletions()) {
+ ColumnListMutation<ByteBuffer> dels = m.withRow(columnFamily, key);
+ dels.setTimestamp(commitTime.getDeletionTime(times));
+
+ for (StaticBuffer b : titanMutation.getDeletions())
+ dels.deleteColumn(b.as(StaticBuffer.BB_FACTORY));
+ }
+
+ if (titanMutation.hasAdditions()) {
+ ColumnListMutation<ByteBuffer> upds = m.withRow(columnFamily, key);
+ upds.setTimestamp(commitTime.getAdditionTime(times));
+
+ for (Entry e : titanMutation.getAdditions()) {
+ Integer ttl = (Integer) e.getMetaData().get(EntryMetaData.TTL);
+
+ if (null != ttl && ttl > 0) {
+ upds.putColumn(e.getColumnAs(StaticBuffer.BB_FACTORY), e.getValueAs(StaticBuffer.BB_FACTORY), ttl);
+ } else {
+ upds.putColumn(e.getColumnAs(StaticBuffer.BB_FACTORY), e.getValueAs(StaticBuffer.BB_FACTORY));
+ }
+ }
+ }
+ }
+ }
+
+ try {
+ m.execute();
+ } catch (ConnectionException e) {
+ throw new TemporaryBackendException(e);
+ }
+
+ sleepAfterWrite(txh, commitTime);
+ }
+
+ @Override
+ public List<KeyRange> getLocalKeyPartition() throws BackendException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clearStorage() throws BackendException {
+ try {
+ Cluster cluster = clusterContext.getClient();
+
+ Keyspace ks = cluster.getKeyspace(keySpaceName);
+
+ // Not a big deal if Keyspace doesn't not exist (dropped manually by user or tests).
+ // This is called on per test setup basis to make sure that previous test cleaned
+ // everything up, so first invocation would always fail as Keyspace doesn't yet exist.
+ if (ks == null)
+ return;
+
+ for (ColumnFamilyDefinition cf : cluster.describeKeyspace(keySpaceName).getColumnFamilyList()) {
+ ks.truncateColumnFamily(new ColumnFamily<Object, Object>(cf.getName(), null, null));
+ }
+ } catch (ConnectionException e) {
+ throw new PermanentBackendException(e);
+ }
+ }
+
+ private void ensureColumnFamilyExists(String name) throws BackendException {
+ ensureColumnFamilyExists(name, "org.apache.cassandra.db.marshal.BytesType");
+ }
+
+ private void ensureColumnFamilyExists(String name, String comparator) throws BackendException {
+ Cluster cl = clusterContext.getClient();
+ try {
+ KeyspaceDefinition ksDef = cl.describeKeyspace(keySpaceName);
+ boolean found = false;
+ if (null != ksDef) {
+ for (ColumnFamilyDefinition cfDef : ksDef.getColumnFamilyList()) {
+ found |= cfDef.getName().equals(name);
+ }
+ }
+ if (!found) {
+ ColumnFamilyDefinition cfDef =
+ cl.makeColumnFamilyDefinition()
+ .setName(name)
+ .setKeyspace(keySpaceName)
+ .setComparatorType(comparator);
+
+ ImmutableMap.Builder<String, String> compressionOptions = new ImmutableMap.Builder<String, String>();
+
+ if (compressionEnabled) {
+ compressionOptions.put("sstable_compression", compressionClass)
+ .put("chunk_length_kb", Integer.toString(compressionChunkSizeKB));
+ }
+
+ cl.addColumnFamily(cfDef.setCompressionOptions(compressionOptions.build()));
+ }
+ } catch (ConnectionException e) {
+ throw new TemporaryBackendException(e);
+ }
+ }
+
+ private static AstyanaxContext<Cluster> createCluster(AstyanaxContext.Builder cb) {
+ AstyanaxContext<Cluster> clusterCtx = cb.buildCluster(ThriftFamilyFactory.getInstance());
+ clusterCtx.start();
+
+ return clusterCtx;
+ }
+
+ private AstyanaxContext.Builder getContextBuilder(Configuration config, int maxConnsPerHost, String usedFor) {
+
+ final ConnectionPoolType poolType = ConnectionPoolType.valueOf(config.get(CONNECTION_POOL_TYPE));
+
+ final NodeDiscoveryType discType = NodeDiscoveryType.valueOf(config.get(NODE_DISCOVERY_TYPE));
+
+ final int maxConnections = config.get(MAX_CONNECTIONS);
+
+ final int maxOperationsPerConnection = config.get(MAX_OPERATIONS_PER_CONNECTION);
+
+ final int connectionTimeout = (int) connectionTimeoutMS.toMillis();
+
+ ConnectionPoolConfigurationImpl cpool =
+ new ConnectionPoolConfigurationImpl(usedFor + "TitanConnectionPool")
+ .setPort(port)
+ .setMaxOperationsPerConnection(maxOperationsPerConnection)
+ .setMaxConnsPerHost(maxConnsPerHost)
+ .setRetryDelaySlice(retryDelaySlice)
+ .setRetryMaxDelaySlice(retryMaxDelaySlice)
+ .setRetrySuspendWindow(retrySuspendWindow)
+ .setSocketTimeout(connectionTimeout)
+ .setConnectTimeout(connectionTimeout)
+ .setSeeds(StringUtils.join(hostnames, ","));
+
+ if (null != retryBackoffStrategy) {
+ cpool.setRetryBackoffStrategy(retryBackoffStrategy);
+ log.debug("Custom RetryBackoffStrategy {}", cpool.getRetryBackoffStrategy());
+ } else {
+ log.debug("Default RetryBackoffStrategy {}", cpool.getRetryBackoffStrategy());
+ }
+
+ if (StringUtils.isNotBlank(localDatacenter)) {
+ cpool.setLocalDatacenter(localDatacenter);
+ log.debug("Set local datacenter: {}", cpool.getLocalDatacenter());
+ }
+
+ AstyanaxConfigurationImpl aconf =
+ new AstyanaxConfigurationImpl()
+ .setConnectionPoolType(poolType)
+ .setDiscoveryType(discType)
+ .setTargetCassandraVersion("1.2")
+ .setMaxThriftSize(thriftFrameSizeBytes);
+
+ if (0 < maxConnections) {
+ cpool.setMaxConns(maxConnections);
+ }
+
+ if (hasAuthentication()) {
+ cpool.setAuthenticationCredentials(new SimpleAuthenticationCredentials(username, password));
+ }
+
+ if (config.get(SSL_ENABLED)) {
+ cpool.setSSLConnectionContext(new SSLConnectionContext(config.get(SSL_TRUSTSTORE_LOCATION), config.get(SSL_TRUSTSTORE_PASSWORD)));
+ }
+
+ AstyanaxContext.Builder ctxBuilder = new AstyanaxContext.Builder();
+
+ // Standard context builder options
+ ctxBuilder
+ .forCluster(clusterName)
+ .forKeyspace(keySpaceName)
+ .withAstyanaxConfiguration(aconf)
+ .withConnectionPoolConfiguration(cpool)
+ .withConnectionPoolMonitor(new CountingConnectionPoolMonitor());
+
+ // Conditional context builder option: host supplier
+ if (config.has(HOST_SUPPLIER)) {
+ String hostSupplier = config.get(HOST_SUPPLIER);
+ Supplier<List<Host>> supplier = null;
+ if (hostSupplier != null) {
+ try {
+ supplier = (Supplier<List<Host>>) Class.forName(hostSupplier).newInstance();
+ ctxBuilder.withHostSupplier(supplier);
+ } catch (Exception e) {
+ log.warn("Problem with host supplier class " + hostSupplier + ", going to use default.", e);
+ }
+ }
+ }
+
+ return ctxBuilder;
+ }
+
+ private void ensureKeyspaceExists(Cluster cl) throws BackendException {
+ KeyspaceDefinition ksDef;
+
+ try {
+ ksDef = cl.describeKeyspace(keySpaceName);
+
+ if (null != ksDef && ksDef.getName().equals(keySpaceName)) {
+ log.debug("Found keyspace {}", keySpaceName);
+ return;
+ }
+ } catch (ConnectionException e) {
+ log.debug("Failed to describe keyspace {}", keySpaceName);
+ }
+
+ log.debug("Creating keyspace {}...", keySpaceName);
+ try {
+ ksDef = cl.makeKeyspaceDefinition()
+ .setName(keySpaceName)
+ .setStrategyClass(storageConfig.get(REPLICATION_STRATEGY))
+ .setStrategyOptions(strategyOptions);
+ cl.addKeyspace(ksDef);
+
+ log.debug("Created keyspace {}", keySpaceName);
+ } catch (ConnectionException e) {
+ log.debug("Failed to create keyspace {}", keySpaceName);
+ throw new TemporaryBackendException(e);
+ }
+ }
+
+ private static RetryBackoffStrategy getRetryBackoffStrategy(String desc) throws PermanentBackendException {
+ if (null == desc)
+ return null;
+
+ String[] tokens = desc.split(",");
+ String policyClassName = tokens[0];
+ int argCount = tokens.length - 1;
+ Integer[] args = new Integer[argCount];
+
+ for (int i = 1; i < tokens.length; i++) {
+ args[i - 1] = Integer.valueOf(tokens[i]);
+ }
+
+ try {
+ RetryBackoffStrategy rbs = instantiate(policyClassName, args, desc);
+ log.debug("Instantiated RetryBackoffStrategy object {} from config string \"{}\"", rbs, desc);
+ return rbs;
+ } catch (Exception e) {
+ throw new PermanentBackendException("Failed to instantiate Astyanax RetryBackoffStrategy implementation", e);
+ }
+ }
+
+ private static RetryPolicy getRetryPolicy(String serializedRetryPolicy) throws BackendException {
+ String[] tokens = serializedRetryPolicy.split(",");
+ String policyClassName = tokens[0];
+ int argCount = tokens.length - 1;
+ Integer[] args = new Integer[argCount];
+ for (int i = 1; i < tokens.length; i++) {
+ args[i - 1] = Integer.valueOf(tokens[i]);
+ }
+
+ try {
+ RetryPolicy rp = instantiate(policyClassName, args, serializedRetryPolicy);
+ log.debug("Instantiated RetryPolicy object {} from config string \"{}\"", rp, serializedRetryPolicy);
+ return rp;
+ } catch (Exception e) {
+ throw new PermanentBackendException("Failed to instantiate Astyanax Retry Policy class", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <V> V instantiate(String policyClassName, Integer[] args, String raw) throws Exception {
+ for (Constructor<?> con : Class.forName(policyClassName).getConstructors()) {
+ Class<?>[] parameterTypes = con.getParameterTypes();
+
+ // match constructor by number of arguments first
+ if (args.length != parameterTypes.length)
+ continue;
+
+ // check if the constructor parameter types are compatible with argument types (which are integer)
+ // note that we allow long.class arguments too because integer is cast to long by runtime.
+ boolean intsOrLongs = true;
+ for (Class<?> pc : parameterTypes) {
+ if (!pc.equals(int.class) && !pc.equals(long.class)) {
+ intsOrLongs = false;
+ break;
+ }
+ }
+
+ // we found a constructor with required number of parameters but times didn't match, let's carry on
+ if (!intsOrLongs)
+ continue;
+
+ if (log.isDebugEnabled())
+ log.debug("About to instantiate class {} with {} arguments", con.toString(), args.length);
+
+ return (V) con.newInstance(args);
+ }
+
+ throw new Exception("Failed to identify a class matching the Astyanax Retry Policy config string \"" + raw + "\"");
+ }
+
+ @Override
+ public Map<String, String> getCompressionOptions(String cf) throws BackendException {
+ try {
+ Keyspace k = keyspaceContext.getClient();
+
+ KeyspaceDefinition kdef = k.describeKeyspace();
+
+ if (null == kdef) {
+ throw new PermanentBackendException("Keyspace " + k.getKeyspaceName() + " is undefined");
+ }
+
+ ColumnFamilyDefinition cfdef = kdef.getColumnFamily(cf);
+
+ if (null == cfdef) {
+ throw new PermanentBackendException("Column family " + cf + " is undefined");
+ }
+
+ return cfdef.getCompressionOptions();
+ } catch (ConnectionException e) {
+ throw new PermanentBackendException(e);
+ }
+ }
+}
+
+
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedKeyColumnValueStore.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedKeyColumnValueStore.java
new file mode 100644
index 0000000..67eec14
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedKeyColumnValueStore.java
@@ -0,0 +1,527 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.embedded;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
+import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.IsBootstrappingException;
+import org.apache.cassandra.exceptions.RequestTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
+
+public class CassandraEmbeddedKeyColumnValueStore implements KeyColumnValueStore {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraEmbeddedKeyColumnValueStore.class);
+
+ private final String keyspace;
+ private final String columnFamily;
+ private final CassandraEmbeddedStoreManager storeManager;
+ private final TimestampProvider times;
+ private final CassandraEmbeddedGetter entryGetter;
+
+ public CassandraEmbeddedKeyColumnValueStore(
+ String keyspace,
+ String columnFamily,
+ CassandraEmbeddedStoreManager storeManager) throws RuntimeException {
+ this.keyspace = keyspace;
+ this.columnFamily = columnFamily;
+ this.storeManager = storeManager;
+ this.times = this.storeManager.getTimestampProvider();
+ entryGetter = new CassandraEmbeddedGetter(storeManager.getMetaDataSchema(columnFamily),times);
+ }
+
+ @Override
+ public void close() throws BackendException {
+ }
+
+ @Override
+ public void acquireLock(StaticBuffer key, StaticBuffer column,
+ StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public KeyIterator getKeys(KeyRangeQuery keyRangeQuery, StoreTransaction txh) throws BackendException {
+ IPartitioner partitioner = StorageService.getPartitioner();
+
+ // see rant about this in Astyanax implementation
+ if (partitioner instanceof RandomPartitioner || partitioner instanceof Murmur3Partitioner)
+ throw new PermanentBackendException("This operation is only supported when byte-ordered partitioner is used.");
+
+ return new RowIterator(keyRangeQuery, storeManager.getPageSize(), txh);
+ }
+
+ @Override
+ public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException {
+ return new RowIterator(getMinimumToken(), getMaximumToken(), query, storeManager.getPageSize(), txh);
+ }
+
+
+ /**
+ * Create a RangeSliceCommand and run it against the StorageProxy.
+ * <p>
+ * To match the behavior of the standard Cassandra thrift API endpoint, the
+ * {@code nowMillis} argument should be the number of milliseconds since the
+ * UNIX Epoch (e.g. System.currentTimeMillis() or equivalent obtained
+ * through a {@link TimestampProvider}). This is per
+ * {@link org.apache.cassandra.thrift.CassandraServer#get_range_slices(ColumnParent, SlicePredicate, KeyRange, ConsistencyLevel)},
+ * which passes the server's System.currentTimeMillis() to the
+ * {@code RangeSliceCommand} constructor.
+ */
+ private List<Row> getKeySlice(Token start,
+ Token end,
+ @Nullable SliceQuery sliceQuery,
+ int pageSize,
+ long nowMillis) throws BackendException {
+ IPartitioner partitioner = StorageService.getPartitioner();
+
+ SliceRange columnSlice = new SliceRange();
+ if (sliceQuery == null) {
+ columnSlice.setStart(ArrayUtils.EMPTY_BYTE_ARRAY)
+ .setFinish(ArrayUtils.EMPTY_BYTE_ARRAY)
+ .setCount(5);
+ } else {
+ columnSlice.setStart(sliceQuery.getSliceStart().asByteBuffer())
+ .setFinish(sliceQuery.getSliceEnd().asByteBuffer())
+ .setCount(sliceQuery.hasLimit() ? sliceQuery.getLimit() : Integer.MAX_VALUE);
+ }
+ /* Note: we need to fetch columns for each row as well to remove "range ghosts" */
+ SlicePredicate predicate = new SlicePredicate().setSlice_range(columnSlice);
+
+ RowPosition startPosition = start.minKeyBound(partitioner);
+ RowPosition endPosition = end.minKeyBound(partitioner);
+
+ List<Row> rows;
+
+ try {
+ CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
+ IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, cfm, null);
+
+ RangeSliceCommand cmd = new RangeSliceCommand(keyspace, columnFamily, nowMillis, filter, new Bounds<RowPosition>(startPosition, endPosition), pageSize);
+
+ rows = StorageProxy.getRangeSlice(cmd, ConsistencyLevel.QUORUM);
+ } catch (Exception e) {
+ throw new PermanentBackendException(e);
+ }
+
+ return rows;
+ }
+
+ @Override
+ public String getName() {
+ return columnFamily;
+ }
+
+ @Override
+ public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
+
+ /**
+ * This timestamp mimics the timestamp used by
+ * {@link org.apache.cassandra.thrift.CassandraServer#get(ByteBuffer,ColumnPath,ConsistencyLevel)}.
+ *
+ * That method passes the server's System.currentTimeMillis() to
+ * {@link ReadCommand#create(String, ByteBuffer, String, long, IDiskAtomFilter)}.
+ * {@code create(...)} in turn passes that timestamp to the SliceFromReadCommand constructor.
+ */
+ final long nowMillis = times.getTime().toEpochMilli();
+ Composite startComposite = CellNames.simpleDense(query.getSliceStart().asByteBuffer());
+ Composite endComposite = CellNames.simpleDense(query.getSliceEnd().asByteBuffer());
+ SliceQueryFilter sqf = new SliceQueryFilter(startComposite, endComposite,
+ false, query.getLimit() + (query.hasLimit()?1:0));
+ ReadCommand sliceCmd = new SliceFromReadCommand(keyspace, query.getKey().asByteBuffer(), columnFamily, nowMillis, sqf);
+
+ List<Row> slice = read(sliceCmd, getTx(txh).getReadConsistencyLevel().getDB());
+
+ if (null == slice || 0 == slice.size())
+ return EntryList.EMPTY_LIST;
+
+ int sliceSize = slice.size();
+ if (1 < sliceSize)
+ throw new PermanentBackendException("Received " + sliceSize + " rows for single key");
+
+ Row r = slice.get(0);
+
+ if (null == r) {
+ log.warn("Null Row object retrieved from Cassandra StorageProxy");
+ return EntryList.EMPTY_LIST;
+ }
+
+ ColumnFamily cf = r.cf;
+
+ if (null == cf) {
+ log.debug("null ColumnFamily (\"{}\")", columnFamily);
+ return EntryList.EMPTY_LIST;
+ }
+
+ if (cf.isMarkedForDelete())
+ return EntryList.EMPTY_LIST;
+
+ return CassandraHelper.makeEntryList(
+ Iterables.filter(cf.getSortedColumns(), new FilterDeletedColumns(nowMillis)),
+ entryGetter,
+ query.getSliceEnd(),
+ query.getLimit());
+
+ }
+
+ private class FilterDeletedColumns implements Predicate<Cell> {
+
+ private final long tsMillis;
+ private final int tsSeconds;
+
+ private FilterDeletedColumns(long tsMillis) {
+ this.tsMillis = tsMillis;
+ this.tsSeconds = (int)(this.tsMillis / 1000L);
+ }
+
+ @Override
+ public boolean apply(Cell input) {
+ if (!input.isLive(tsMillis))
+ return false;
+
+ // Don't do this. getTimeToLive() is a duration divorced from any particular clock.
+ // For instance, if TTL=10 seconds, getTimeToLive() will have value 10 (not 10 + epoch seconds), and
+ // this will always return false.
+ //if (input instanceof ExpiringCell)
+ // return tsSeconds < ((ExpiringCell)input).getTimeToLive();
+
+ return true;
+ }
+ }
+
+ @Override
+ public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void mutate(StaticBuffer key, List<Entry> additions,
+ List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
+ Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new
+ KCVMutation(additions, deletions));
+ mutateMany(mutations, txh);
+ }
+
+
+ public void mutateMany(Map<StaticBuffer, KCVMutation> mutations,
+ StoreTransaction txh) throws BackendException {
+ storeManager.mutateMany(ImmutableMap.of(columnFamily, mutations), txh);
+ }
+
+ private static List<Row> read(ReadCommand cmd, org.apache.cassandra.db.ConsistencyLevel clvl) throws BackendException {
+ ArrayList<ReadCommand> cmdHolder = new ArrayList<ReadCommand>(1);
+ cmdHolder.add(cmd);
+ return read(cmdHolder, clvl);
+ }
+
+ private static List<Row> read(List<ReadCommand> cmds, org.apache.cassandra.db.ConsistencyLevel clvl) throws BackendException {
+ try {
+ return StorageProxy.read(cmds, clvl);
+ } catch (UnavailableException e) {
+ throw new TemporaryBackendException(e);
+ } catch (RequestTimeoutException e) {
+ throw new PermanentBackendException(e);
+ } catch (IsBootstrappingException e) {
+ throw new TemporaryBackendException(e);
+ } catch (InvalidRequestException e) {
+ throw new PermanentBackendException(e);
+ }
+ }
+
+ private static class CassandraEmbeddedGetter implements StaticArrayEntry.GetColVal<Cell,ByteBuffer> {
+
+ private final EntryMetaData[] schema;
+ private final TimestampProvider times;
+
+ private CassandraEmbeddedGetter(EntryMetaData[] schema, TimestampProvider times) {
+ this.schema = schema;
+ this.times = times;
+ }
+
+ @Override
+ public ByteBuffer getColumn(Cell element) {
+ return org.apache.cassandra.utils.ByteBufferUtil.clone(element.name().toByteBuffer());
+ }
+
+ @Override
+ public ByteBuffer getValue(Cell element) {
+ return org.apache.cassandra.utils.ByteBufferUtil.clone(element.value());
+ }
+
+ @Override
+ public EntryMetaData[] getMetaSchema(Cell element) {
+ return schema;
+ }
+
+ @Override
+ public Object getMetaData(Cell element, EntryMetaData meta) {
+ switch (meta) {
+ case TIMESTAMP:
+ return element.timestamp();
+ case TTL:
+ return ((element instanceof ExpiringCell)
+ ? ((ExpiringCell) element).getTimeToLive()
+ : 0);
+ default:
+ throw new UnsupportedOperationException("Unsupported meta data: " + meta);
+ }
+ }
+ }
+
+ private class RowIterator implements KeyIterator {
+ private final Token maximumToken;
+ private final SliceQuery sliceQuery;
+ private final StoreTransaction txh;
+
+ /**
+ * This RowIterator will use this timestamp for its entire lifetime,
+ * even if the iterator runs more than one distinct slice query while
+ * paging. <b>This field must be in units of milliseconds since
+ * the UNIX Epoch</b>.
+ * <p>
+ * This timestamp is passed to three methods/constructors:
+ * <ul>
+ * <li>{@link org.apache.cassandra.db.Column#isMarkedForDelete(long now)}</li>
+ * <li>{@link org.apache.cassandra.db.ColumnFamily#hasOnlyTombstones(long)}</li>
+ * <li>
+ * the {@link RangeSliceCommand} constructor via the last argument
+ * to {@link CassandraEmbeddedKeyColumnValueStore#getKeySlice(Token, Token, SliceQuery, int, long)}
+ * </li>
+ * </ul>
+ * The second list entry just calls the first and almost doesn't deserve
+ * a mention at present, but maybe the implementation will change in the future.
+ * <p>
+ * When this value needs to be compared to TTL seconds expressed in seconds,
+ * Cassandra internals do the conversion.
+ * Consider {@link ExpiringColumn#isMarkedForDelete(long)}, which is implemented,
+ * as of 2.0.6, by the following one-liner:
+ * <p>
+ * {@code return (int) (now / 1000) >= getLocalDeletionTime()}
+ * <p>
+ * The {@code now / 1000} does the conversion from milliseconds to seconds
+ * (the units of getLocalDeletionTime()).
+ */
+ private final long nowMillis;
+
+ private Iterator<Row> keys;
+ private ByteBuffer lastSeenKey = null;
+ private Row currentRow;
+ private int pageSize;
+
+ private boolean isClosed;
+
+ public RowIterator(KeyRangeQuery keyRangeQuery, int pageSize, StoreTransaction txh) throws BackendException {
+ this(StorageService.getPartitioner().getToken(keyRangeQuery.getKeyStart().asByteBuffer()),
+ StorageService.getPartitioner().getToken(keyRangeQuery.getKeyEnd().asByteBuffer()),
+ keyRangeQuery,
+ pageSize,
+ txh);
+ }
+
+ public RowIterator(Token minimum, Token maximum, SliceQuery sliceQuery, int pageSize, StoreTransaction txh) throws BackendException {
+ this.pageSize = pageSize;
+ this.sliceQuery = sliceQuery;
+ this.maximumToken = maximum;
+ this.txh = txh;
+ this.nowMillis = times.getTime().toEpochMilli();
+ this.keys = getRowsIterator(getKeySlice(minimum, maximum, sliceQuery, pageSize, nowMillis));
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return hasNextInternal();
+ } catch (BackendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public StaticBuffer next() {
+ ensureOpen();
+
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ currentRow = keys.next();
+ ByteBuffer currentKey = currentRow.key.getKey().duplicate();
+
+ try {
+ return StaticArrayBuffer.of(currentKey);
+ } finally {
+ lastSeenKey = currentKey;
+ }
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RecordIterator<Entry> getEntries() {
+ ensureOpen();
+
+ if (sliceQuery == null)
+ throw new IllegalStateException("getEntries() requires SliceQuery to be set.");
+
+ return new RecordIterator<Entry>() {
+ final Iterator<Entry> columns = CassandraHelper.makeEntryIterator(
+ Iterables.filter(currentRow.cf.getSortedColumns(), new FilterDeletedColumns(nowMillis)),
+ entryGetter,
+ sliceQuery.getSliceEnd(),
+ sliceQuery.getLimit());
+
+ //cfToEntries(currentRow.cf, sliceQuery).iterator();
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+ return columns.hasNext();
+ }
+
+ @Override
+ public Entry next() {
+ ensureOpen();
+ return columns.next();
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ }
+
+ private final boolean hasNextInternal() throws BackendException {
+ ensureOpen();
+
+ if (keys == null)
+ return false;
+
+ boolean hasNext = keys.hasNext();
+
+ if (!hasNext && lastSeenKey != null) {
+ Token lastSeenToken = StorageService.getPartitioner().getToken(lastSeenKey.duplicate());
+
+ // let's check if we reached key upper bound already so we can skip one useless call to Cassandra
+ if (maximumToken != getMinimumToken() && lastSeenToken.equals(maximumToken)) {
+ return false;
+ }
+
+ List<Row> newKeys = getKeySlice(StorageService.getPartitioner().getToken(lastSeenKey), maximumToken, sliceQuery, pageSize, nowMillis);
+
+ keys = getRowsIterator(newKeys, lastSeenKey);
+ hasNext = keys.hasNext();
+ }
+
+ return hasNext;
+ }
+
+ private void ensureOpen() {
+ if (isClosed)
+ throw new IllegalStateException("Iterator has been closed.");
+ }
+
+ private Iterator<Row> getRowsIterator(List<Row> rows) {
+ if (rows == null)
+ return null;
+
+ return Iterators.filter(rows.iterator(), new Predicate<Row>() {
+ @Override
+ public boolean apply(@Nullable Row row) {
+ // The hasOnlyTombstones(x) call below ultimately calls Column.isMarkedForDelete(x)
+ return !(row == null || row.cf == null || row.cf.isMarkedForDelete() || row.cf.hasOnlyTombstones(nowMillis));
+ }
+ });
+ }
+
+ private Iterator<Row> getRowsIterator(List<Row> rows, final ByteBuffer exceptKey) {
+ Iterator<Row> rowIterator = getRowsIterator(rows);
+
+ if (rowIterator == null)
+ return null;
+
+ return Iterators.filter(rowIterator, new Predicate<Row>() {
+ @Override
+ public boolean apply(@Nullable Row row) {
+ return row != null && !row.key.getKey().equals(exceptKey);
+ }
+ });
+ }
+ }
+
+ private static Token getMinimumToken() throws PermanentBackendException {
+ IPartitioner partitioner = StorageService.getPartitioner();
+
+ if (partitioner instanceof RandomPartitioner) {
+ return ((RandomPartitioner) partitioner).getMinimumToken();
+ } else if (partitioner instanceof Murmur3Partitioner) {
+ return ((Murmur3Partitioner) partitioner).getMinimumToken();
+ } else if (partitioner instanceof ByteOrderedPartitioner) {
+ //TODO: This makes the assumption that its an EdgeStore (i.e. 8 byte keys)
+ return new BytesToken(com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil.zeroByteBuffer(8));
+ } else {
+ throw new PermanentBackendException("Unsupported partitioner: " + partitioner);
+ }
+ }
+
+ private static Token getMaximumToken() throws PermanentBackendException {
+ IPartitioner partitioner = StorageService.getPartitioner();
+
+ if (partitioner instanceof RandomPartitioner) {
+ return new BigIntegerToken(RandomPartitioner.MAXIMUM);
+ } else if (partitioner instanceof Murmur3Partitioner) {
+ return new LongToken(Murmur3Partitioner.MAXIMUM);
+ } else if (partitioner instanceof ByteOrderedPartitioner) {
+ //TODO: This makes the assumption that its an EdgeStore (i.e. 8 byte keys)
+ return new BytesToken(com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil.oneByteBuffer(8));
+ } else {
+ throw new PermanentBackendException("Unsupported partitioner: " + partitioner);
+ }
+ }
+}
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedStoreManager.java
new file mode 100644
index 0000000..cecc92f
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedStoreManager.java
@@ -0,0 +1,399 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.embedded;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
+import com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+
+import org.apache.cassandra.cache.CachingOptions;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SliceByNamesReadCommand;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.scheduler.IRequestScheduler;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
+import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraDaemonWrapper;
+
+public class CassandraEmbeddedStoreManager extends AbstractCassandraStoreManager {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraEmbeddedStoreManager.class);
+
+ /**
+ * The default value for
+ * {@link GraphDatabaseConfiguration#STORAGE_CONF_FILE}.
+ */
+ public static final String CASSANDRA_YAML_DEFAULT = "./conf/cassandra.yaml";
+
+ private final Map<String, CassandraEmbeddedKeyColumnValueStore> openStores;
+
+ private final IRequestScheduler requestScheduler;
+
+ public CassandraEmbeddedStoreManager(Configuration config) throws BackendException {
+ super(config);
+
+ String cassandraConfig = CASSANDRA_YAML_DEFAULT;
+ if (config.has(GraphDatabaseConfiguration.STORAGE_CONF_FILE)) {
+ cassandraConfig = config.get(GraphDatabaseConfiguration.STORAGE_CONF_FILE);
+ }
+
+ assert cassandraConfig != null && !cassandraConfig.isEmpty();
+
+ File ccf = new File(cassandraConfig);
+
+ if (ccf.exists() && ccf.isAbsolute()) {
+ cassandraConfig = "file://" + cassandraConfig;
+ log.debug("Set cassandra config string \"{}\"", cassandraConfig);
+ }
+
+ CassandraDaemonWrapper.start(cassandraConfig);
+
+ this.openStores = new HashMap<String, CassandraEmbeddedKeyColumnValueStore>(8);
+ this.requestScheduler = DatabaseDescriptor.getRequestScheduler();
+ }
+
+ @Override
+ public Deployment getDeployment() {
+ return Deployment.EMBEDDED;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public IPartitioner getCassandraPartitioner()
+ throws BackendException {
+ try {
+ return StorageService.getPartitioner();
+ } catch (Exception e) {
+ log.warn("Could not read local token range: {}", e);
+ throw new PermanentBackendException("Could not read partitioner information on cluster", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "embeddedCassandra" + super.toString();
+ }
+
+ @Override
+ public void close() {
+ openStores.clear();
+ CassandraDaemonWrapper.stop();
+ }
+
+ @Override
+ public synchronized KeyColumnValueStore openDatabase(String name, StoreMetaData.Container metaData) throws BackendException {
+ if (openStores.containsKey(name))
+ return openStores.get(name);
+
+ // Ensure that both the keyspace and column family exist
+ ensureKeyspaceExists(keySpaceName);
+ ensureColumnFamilyExists(keySpaceName, name);
+
+ CassandraEmbeddedKeyColumnValueStore store = new CassandraEmbeddedKeyColumnValueStore(keySpaceName, name, this);
+ openStores.put(name, store);
+ return store;
+ }
+
+ /*
+ * Raw type warnings are suppressed in this method because
+ * {@link StorageService#getLocalPrimaryRanges(String)} returns a raw
+ * (unparameterized) type.
+ */
+ public List<KeyRange> getLocalKeyPartition() throws BackendException {
+ ensureKeyspaceExists(keySpaceName);
+
+ @SuppressWarnings("rawtypes")
+ Collection<Range<Token>> ranges = StorageService.instance.getPrimaryRanges(keySpaceName);
+
+ List<KeyRange> keyRanges = new ArrayList<KeyRange>(ranges.size());
+
+ for (@SuppressWarnings("rawtypes") Range<Token> range : ranges) {
+ keyRanges.add(CassandraHelper.transformRange(range));
+ }
+
+ return keyRanges;
+ }
+
+ /*
+ * This implementation can't handle counter columns.
+ *
+ * The private method internal_batch_mutate in CassandraServer as of 1.2.0
+ * provided most of the following method after transaction handling.
+ */
+ @Override
+ public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
+ Preconditions.checkNotNull(mutations);
+
+ final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
+
+ int size = 0;
+ for (Map<StaticBuffer, KCVMutation> mutation : mutations.values()) size += mutation.size();
+ Map<StaticBuffer, org.apache.cassandra.db.Mutation> rowMutations = new HashMap<>(size);
+
+ for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> mutEntry : mutations.entrySet()) {
+ String columnFamily = mutEntry.getKey();
+ for (Map.Entry<StaticBuffer, KCVMutation> titanMutation : mutEntry.getValue().entrySet()) {
+ StaticBuffer key = titanMutation.getKey();
+ KCVMutation mut = titanMutation.getValue();
+
+ org.apache.cassandra.db.Mutation rm = rowMutations.get(key);
+ if (rm == null) {
+ rm = new org.apache.cassandra.db.Mutation(keySpaceName, key.asByteBuffer());
+ rowMutations.put(key, rm);
+ }
+
+ if (mut.hasAdditions()) {
+ for (Entry e : mut.getAdditions()) {
+ Integer ttl = (Integer) e.getMetaData().get(EntryMetaData.TTL);
+
+ if (null != ttl && ttl > 0) {
+ rm.add(columnFamily, CellNames.simpleDense(e.getColumnAs(StaticBuffer.BB_FACTORY)),
+ e.getValueAs(StaticBuffer.BB_FACTORY), commitTime.getAdditionTime(times), ttl);
+ } else {
+ rm.add(columnFamily, CellNames.simpleDense(e.getColumnAs(StaticBuffer.BB_FACTORY)),
+ e.getValueAs(StaticBuffer.BB_FACTORY), commitTime.getAdditionTime(times));
+ }
+ }
+ }
+
+ if (mut.hasDeletions()) {
+ for (StaticBuffer col : mut.getDeletions()) {
+ rm.delete(columnFamily, CellNames.simpleDense(col.as(StaticBuffer.BB_FACTORY)),
+ commitTime.getDeletionTime(times));
+ }
+ }
+
+ }
+ }
+
+ mutate(new ArrayList<org.apache.cassandra.db.Mutation>(rowMutations.values()), getTx(txh).getWriteConsistencyLevel().getDB());
+
+ sleepAfterWrite(txh, commitTime);
+ }
+
+ private void mutate(List<org.apache.cassandra.db.Mutation> cmds, org.apache.cassandra.db.ConsistencyLevel clvl) throws BackendException {
+ try {
+ schedule(DatabaseDescriptor.getRpcTimeout());
+ try {
+ if (atomicBatch) {
+ StorageProxy.mutateAtomically(cmds, clvl);
+ } else {
+ StorageProxy.mutate(cmds, clvl);
+ }
+ } catch (RequestExecutionException e) {
+ throw new TemporaryBackendException(e);
+ } finally {
+ release();
+ }
+ } catch (TimeoutException ex) {
+ log.debug("Cassandra TimeoutException", ex);
+ throw new TemporaryBackendException(ex);
+ }
+ }
+
+ private void schedule(long timeoutMS) throws TimeoutException {
+ requestScheduler.queue(Thread.currentThread(), "default", DatabaseDescriptor.getRpcTimeout());
+ }
+
+ /**
+ * Release count for the used up resources
+ */
+ private void release() {
+ requestScheduler.release();
+ }
+
+ @Override
+ public void clearStorage() throws BackendException {
+ openStores.clear();
+ try {
+ KSMetaData ksMetaData = Schema.instance.getKSMetaData(keySpaceName);
+
+ // Not a big deal if Keyspace doesn't not exist (dropped manually by user or tests).
+ // This is called on per test setup basis to make sure that previous test cleaned
+ // everything up, so first invocation would always fail as Keyspace doesn't yet exist.
+ if (ksMetaData == null)
+ return;
+
+ for (String cfName : ksMetaData.cfMetaData().keySet())
+ StorageService.instance.truncate(keySpaceName, cfName);
+ } catch (Exception e) {
+ throw new PermanentBackendException(e);
+ }
+ }
+
+ private void ensureKeyspaceExists(String keyspaceName) throws BackendException {
+
+ if (null != Schema.instance.getKeyspaceInstance(keyspaceName))
+ return;
+
+ // Keyspace not found; create it
+ String strategyName = storageConfig.get(REPLICATION_STRATEGY);
+
+ KSMetaData ksm;
+ try {
+ ksm = KSMetaData.newKeyspace(keyspaceName, strategyName, strategyOptions, true);
+ } catch (ConfigurationException e) {
+ throw new PermanentBackendException("Failed to instantiate keyspace metadata for " + keyspaceName, e);
+ }
+ try {
+ MigrationManager.announceNewKeyspace(ksm);
+ log.info("Created keyspace {}", keyspaceName);
+ } catch (ConfigurationException e) {
+ throw new PermanentBackendException("Failed to create keyspace " + keyspaceName, e);
+ }
+ }
+
+ private void ensureColumnFamilyExists(String ksName, String cfName) throws BackendException {
+ ensureColumnFamilyExists(ksName, cfName, BytesType.instance);
+ }
+
+ private void ensureColumnFamilyExists(String keyspaceName, String columnfamilyName, AbstractType<?> comparator) throws BackendException {
+ if (null != Schema.instance.getCFMetaData(keyspaceName, columnfamilyName))
+ return;
+
+ // Column Family not found; create it
+ CFMetaData cfm = new CFMetaData(keyspaceName, columnfamilyName, ColumnFamilyType.Standard, CellNames.fromAbstractType(comparator, true));
+
+ // Hard-coded caching settings
+ if (columnfamilyName.startsWith(Backend.EDGESTORE_NAME)) {
+ cfm.caching(CachingOptions.KEYS_ONLY);
+ } else if (columnfamilyName.startsWith(Backend.INDEXSTORE_NAME)) {
+ cfm.caching(CachingOptions.ROWS_ONLY);
+ }
+
+ // Configure sstable compression
+ final CompressionParameters cp;
+ if (compressionEnabled) {
+ try {
+ cp = new CompressionParameters(compressionClass,
+ compressionChunkSizeKB * 1024,
+ Collections.<String, String>emptyMap());
+ // CompressionParameters doesn't override toString(), so be explicit
+ log.debug("Creating CF {}: setting {}={} and {}={} on {}",
+ new Object[]{
+ columnfamilyName,
+ CompressionParameters.SSTABLE_COMPRESSION, compressionClass,
+ CompressionParameters.CHUNK_LENGTH_KB, compressionChunkSizeKB,
+ cp});
+ } catch (ConfigurationException ce) {
+ throw new PermanentBackendException(ce);
+ }
+ } else {
+ cp = new CompressionParameters(null);
+ log.debug("Creating CF {}: setting {} to null to disable compression",
+ columnfamilyName, CompressionParameters.SSTABLE_COMPRESSION);
+ }
+ cfm.compressionParameters(cp);
+
+ try {
+ cfm.addDefaultIndexNames();
+ } catch (ConfigurationException e) {
+ throw new PermanentBackendException("Failed to create column family metadata for " + keyspaceName + ":" + columnfamilyName, e);
+ }
+ try {
+ MigrationManager.announceNewColumnFamily(cfm);
+ log.info("Created CF {} in KS {}", columnfamilyName, keyspaceName);
+ } catch (ConfigurationException e) {
+ throw new PermanentBackendException("Failed to create column family " + keyspaceName + ":" + columnfamilyName, e);
+ }
+
+ /*
+ * I'm chasing a nondetermistic exception that appears only rarely on my
+ * machine when executing the embedded cassandra tests. If these dummy
+ * reads ever actually fail and dump a log message, it could help debug
+ * the root cause.
+ *
+ * java.lang.RuntimeException: java.lang.IllegalArgumentException: Unknown table/cf pair (InternalCassandraEmbeddedKeyColumnValueTest.testStore1)
+ * at org.apache.cassandra.service.StorageProxy$DroppableRunnable.run(StorageProxy.java:1582)
+ * at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
+ * at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
+ * at java.lang.Thread.run(Thread.java:744)
+ * Caused by: java.lang.IllegalArgumentException: Unknown table/cf pair (InternalCassandraEmbeddedKeyColumnValueTest.testStore1)
+ * at org.apache.cassandra.db.Table.getColumnFamilyStore(Table.java:166)
+ * at org.apache.cassandra.db.Table.getRow(Table.java:354)
+ * at org.apache.cassandra.db.SliceFromReadCommand.getRow(SliceFromReadCommand.java:70)
+ * at org.apache.cassandra.service.StorageProxy$LocalReadRunnable.runMayThrow(StorageProxy.java:1052)
+ * at org.apache.cassandra.service.StorageProxy$DroppableRunnable.run(StorageProxy.java:1578)
+ * ... 3 more
+ */
+ retryDummyRead(keyspaceName, columnfamilyName);
+ }
+
+ @Override
+ public Map<String, String> getCompressionOptions(String cf) throws BackendException {
+
+ CFMetaData cfm = Schema.instance.getCFMetaData(keySpaceName, cf);
+
+ if (cfm == null)
+ return null;
+
+ return ImmutableMap.copyOf(cfm.compressionParameters().asThriftOptions());
+ }
+
+ private void retryDummyRead(String ks, String cf) throws PermanentBackendException {
+
+ final long limit = System.currentTimeMillis() + (60L * 1000L);
+
+ while (System.currentTimeMillis() < limit) {
+ try {
+ SortedSet<CellName> names = new TreeSet<>(new Comparator<CellName>() {
+ // This is a singleton set. We need to define a comparator because SimpleDenseCellName is not
+ // comparable, but it doesn't have to be a useful comparator
+ @Override
+ public int compare(CellName o1, CellName o2)
+ {
+ return 0;
+ }
+ });
+ names.add(CellNames.simpleDense(ByteBufferUtil.zeroByteBuffer(1)));
+ NamesQueryFilter nqf = new NamesQueryFilter(names);
+ SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(ks, ByteBufferUtil.zeroByteBuffer(1), cf, 1L, nqf);
+ StorageProxy.read(ImmutableList.<ReadCommand> of(cmd), ConsistencyLevel.QUORUM);
+ log.info("Read on CF {} in KS {} succeeded", cf, ks);
+ return;
+ } catch (Throwable t) {
+ log.warn("Failed to read CF {} in KS {} following creation", cf, ks, t);
+ }
+
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ throw new PermanentBackendException(e);
+ }
+ }
+
+ throw new PermanentBackendException("Timed out while attempting to read CF " + cf + " in KS " + ks + " following creation");
+ }
+}
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java
new file mode 100644
index 0000000..e44ab92
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java
@@ -0,0 +1,536 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionPool;
+import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
+import com.thinkaurelius.titan.diskstorage.util.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
+
+/**
+ * A Titan {@code KeyColumnValueStore} backed by Cassandra.
+ * This uses the Cassandra Thrift API.
+ *
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ * @see CassandraThriftStoreManager
+ */
+public class CassandraThriftKeyColumnValueStore implements KeyColumnValueStore {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(CassandraThriftKeyColumnValueStore.class);
+
+ private static final Pattern BROKEN_BYTE_TOKEN_PATTERN = Pattern.compile("^Token\\(bytes\\[(.+)\\]\\)$");
+
+ // Cassandra access
+ private final CassandraThriftStoreManager storeManager;
+ private final String keyspace;
+ private final String columnFamily;
+ private final CTConnectionPool pool;
+ private final ThriftGetter entryGetter;
+
+ public CassandraThriftKeyColumnValueStore(String keyspace, String columnFamily, CassandraThriftStoreManager storeManager,
+ CTConnectionPool pool) {
+ this.storeManager = storeManager;
+ this.keyspace = keyspace;
+ this.columnFamily = columnFamily;
+ this.pool = pool;
+ this.entryGetter = new ThriftGetter(storeManager.getMetaDataSchema(columnFamily));
+ }
+
+ /**
+ * Call Cassandra's Thrift get_slice() method.
+ * <p/>
+ * When columnEnd equals columnStart and either startInclusive
+ * or endInclusive is false (or both are false), then this
+ * method returns an empty list without making any Thrift calls.
+ * <p/>
+ * If columnEnd = columnStart + 1, and both startInclusive and
+ * startExclusive are false, then the arguments effectively form
+ * an empty interval. In this case, as in the one previous,
+ * an empty list is returned. However, it may not necessarily
+ * be handled efficiently; a Thrift call might still be made
+ * before returning the empty list.
+ *
+ * @throws com.thinkaurelius.titan.diskstorage.BackendException
+ * when columnEnd < columnStart
+ */
+ @Override
+ public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
+ Map<StaticBuffer, EntryList> result = getNamesSlice(query.getKey(), query, txh);
+ return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
+ }
+
+ @Override
+ public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
+ return getNamesSlice(keys, query, txh);
+ }
+
+ public Map<StaticBuffer, EntryList> getNamesSlice(StaticBuffer key,
+ SliceQuery query, StoreTransaction txh) throws BackendException {
+ return getNamesSlice(ImmutableList.of(key),query,txh);
+ }
+
+ public Map<StaticBuffer, EntryList> getNamesSlice(List<StaticBuffer> keys,
+ SliceQuery query,
+ StoreTransaction txh) throws BackendException {
+ ColumnParent parent = new ColumnParent(columnFamily);
+ /*
+ * Cassandra cannot handle columnStart = columnEnd.
+ * Cassandra's Thrift getSlice() throws InvalidRequestException
+ * if columnStart = columnEnd.
+ */
+ if (query.getSliceStart().compareTo(query.getSliceEnd()) >= 0) {
+ // Check for invalid arguments where columnEnd < columnStart
+ if (query.getSliceEnd().compareTo(query.getSliceStart())<0) {
+ throw new PermanentBackendException("columnStart=" + query.getSliceStart() +
+ " is greater than columnEnd=" + query.getSliceEnd() + ". " +
+ "columnStart must be less than or equal to columnEnd");
+ }
+ if (0 != query.getSliceStart().length() && 0 != query.getSliceEnd().length()) {
+ logger.debug("Return empty list due to columnEnd==columnStart and neither empty");
+ return KCVSUtil.emptyResults(keys);
+ }
+ }
+
+ assert query.getSliceStart().compareTo(query.getSliceEnd()) < 0;
+ ConsistencyLevel consistency = getTx(txh).getReadConsistencyLevel().getThrift();
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange range = new SliceRange();
+ range.setCount(query.getLimit() + (query.hasLimit()?1:0)); //Add one for potentially removed last column
+ range.setStart(query.getSliceStart().asByteBuffer());
+ range.setFinish(query.getSliceEnd().asByteBuffer());
+ predicate.setSlice_range(range);
+
+ CTConnection conn = null;
+ try {
+ conn = pool.borrowObject(keyspace);
+ Cassandra.Client client = conn.getClient();
+ Map<ByteBuffer, List<ColumnOrSuperColumn>> rows = client.multiget_slice(CassandraHelper.convert(keys),
+ parent,
+ predicate,
+ consistency);
+
+ /*
+ * The final size of the "result" List may be at most rows.size().
+ * However, "result" could also be up to two elements smaller than
+ * rows.size(), depending on startInclusive and endInclusive
+ */
+ Map<StaticBuffer, EntryList> results = new HashMap<StaticBuffer, EntryList>();
+
+ for (ByteBuffer key : rows.keySet()) {
+ results.put(StaticArrayBuffer.of(key),
+ CassandraHelper.makeEntryList(rows.get(key), entryGetter, query.getSliceEnd(), query.getLimit()));
+ }
+
+ return results;
+ } catch (Exception e) {
+ throw convertException(e);
+ } finally {
+ pool.returnObjectUnsafe(keyspace, conn);
+ }
+ }
+
+ private static class ThriftGetter implements StaticArrayEntry.GetColVal<ColumnOrSuperColumn,ByteBuffer> {
+
+ private final EntryMetaData[] schema;
+
+ private ThriftGetter(EntryMetaData[] schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public ByteBuffer getColumn(ColumnOrSuperColumn element) {
+ return element.getColumn().bufferForName();
+ }
+
+ @Override
+ public ByteBuffer getValue(ColumnOrSuperColumn element) {
+ return element.getColumn().bufferForValue();
+ }
+
+ @Override
+ public EntryMetaData[] getMetaSchema(ColumnOrSuperColumn element) {
+ return schema;
+ }
+
+ @Override
+ public Object getMetaData(ColumnOrSuperColumn element, EntryMetaData meta) {
+ switch(meta) {
+ case TIMESTAMP:
+ return element.getColumn().getTimestamp();
+ case TTL:
+ return element.getColumn().getTtl();
+ default:
+ throw new UnsupportedOperationException("Unsupported meta data: " + meta);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ // Do nothing
+ }
+
+ @Override
+ public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue,
+ StoreTransaction txh) throws BackendException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public KeyIterator getKeys(@Nullable SliceQuery sliceQuery, StoreTransaction txh) throws BackendException {
+ final IPartitioner partitioner = storeManager.getCassandraPartitioner();
+
+ if (!(partitioner instanceof RandomPartitioner) && !(partitioner instanceof Murmur3Partitioner))
+ throw new PermanentBackendException("This operation is only allowed when random partitioner (md5 or murmur3) is used.");
+
+ try {
+ return new AllTokensIterator(partitioner, sliceQuery, storeManager.getPageSize());
+ } catch (Exception e) {
+ throw convertException(e);
+ }
+ }
+
+ @Override
+ public KeyIterator getKeys(KeyRangeQuery keyRangeQuery, StoreTransaction txh) throws BackendException {
+ final IPartitioner partitioner = storeManager.getCassandraPartitioner();
+
+ // see rant about the reason of this limitation in Astyanax implementation of this method.
+ if (!(partitioner instanceof AbstractByteOrderedPartitioner))
+ throw new PermanentBackendException("This operation is only allowed when byte-ordered partitioner is used.");
+
+ try {
+ return new KeyRangeIterator(partitioner, keyRangeQuery, storeManager.getPageSize(),
+ keyRangeQuery.getKeyStart().asByteBuffer(),
+ keyRangeQuery.getKeyEnd().asByteBuffer());
+ } catch (Exception e) {
+ throw convertException(e);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return columnFamily;
+ }
+
+ @Override
+ public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
+ Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
+ mutateMany(mutations, txh);
+ }
+
+ public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
+ storeManager.mutateMany(ImmutableMap.of(columnFamily, mutations), txh);
+ }
+
+ static BackendException convertException(Throwable e) {
+ if (e instanceof TException) {
+ return new PermanentBackendException(e);
+ } else if (e instanceof TimedOutException) {
+ return new TemporaryBackendException(e);
+ } else if (e instanceof UnavailableException) {
+ return new TemporaryBackendException(e);
+ } else if (e instanceof InvalidRequestException) {
+ return new PermanentBackendException(e);
+ } else {
+ return new PermanentBackendException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "CassandraThriftKeyColumnValueStore[ks="
+ + keyspace + ", cf=" + columnFamily + "]";
+ }
+
+
+ private List<KeySlice> getKeySlice(ByteBuffer startKey,
+ ByteBuffer endKey,
+ SliceQuery columnSlice,
+ int count) throws BackendException {
+ return getRangeSlices(new org.apache.cassandra.thrift.KeyRange().setStart_key(startKey).setEnd_key(endKey).setCount(count), columnSlice);
+ }
+
+ private <T extends Token> List<KeySlice> getTokenSlice(T startToken, T endToken,
+ SliceQuery sliceQuery, int count) throws BackendException {
+
+ String st = sanitizeBrokenByteToken(startToken);
+ String et = sanitizeBrokenByteToken(endToken);
+
+ org.apache.cassandra.thrift.KeyRange kr = new org.apache.cassandra.thrift.KeyRange().setStart_token(st).setEnd_token(et).setCount(count);
+
+ return getRangeSlices(kr, sliceQuery);
+ }
+
+ private String sanitizeBrokenByteToken(Token tok) {
+ /*
+ * Background: https://issues.apache.org/jira/browse/CASSANDRA-5566
+ *
+ * This check is useful for compatibility with Cassandra server versions
+ * 1.2.4 and earlier.
+ */
+ String st = tok.toString();
+ if (!(tok instanceof BytesToken))
+ return st;
+
+ // Do a cheap 1-character startsWith before unleashing the regex
+ if (st.startsWith("T")) {
+ Matcher m = BROKEN_BYTE_TOKEN_PATTERN.matcher(st);
+ if (!m.matches()) {
+ logger.warn("Unknown token string format: \"{}\"", st);
+ } else {
+ String old = st;
+ st = m.group(1);
+ logger.debug("Rewrote token string: \"{}\" -> \"{}\"", old, st);
+ }
+ }
+ return st;
+ }
+
+ private List<KeySlice> getRangeSlices(org.apache.cassandra.thrift.KeyRange keyRange, @Nullable SliceQuery sliceQuery) throws BackendException {
+ SliceRange sliceRange = new SliceRange();
+
+ if (sliceQuery == null) {
+ sliceRange.setStart(ArrayUtils.EMPTY_BYTE_ARRAY)
+ .setFinish(ArrayUtils.EMPTY_BYTE_ARRAY)
+ .setCount(5);
+ } else {
+ sliceRange.setStart(sliceQuery.getSliceStart().asByteBuffer())
+ .setFinish(sliceQuery.getSliceEnd().asByteBuffer())
+ .setCount((sliceQuery.hasLimit()) ? sliceQuery.getLimit() : Integer.MAX_VALUE);
+ }
+
+
+ CTConnection connection = null;
+ try {
+ connection = pool.borrowObject(keyspace);
+
+ List<KeySlice> slices =
+ connection.getClient().get_range_slices(new ColumnParent(columnFamily),
+ new SlicePredicate()
+ .setSlice_range(sliceRange),
+ keyRange,
+ ConsistencyLevel.QUORUM);
+
+ for (KeySlice s : slices) {
+ logger.debug("Key {}", ByteBufferUtil.toString(s.key, "-"));
+ }
+
+ /* Note: we need to fetch columns for each row as well to remove "range ghosts" */
+ List<KeySlice> result = new ArrayList<>(slices.size());
+ KeyIterationPredicate pred = new KeyIterationPredicate();
+ for (KeySlice ks : slices)
+ if (pred.apply(ks))
+ result.add(ks);
+ return result;
+ } catch (Exception e) {
+ throw convertException(e);
+ } finally {
+ if (connection != null)
+ pool.returnObjectUnsafe(keyspace, connection);
+ }
+ }
+
+ private static class KeyIterationPredicate implements Predicate<KeySlice> {
+
+ @Override
+ public boolean apply(@Nullable KeySlice row) {
+ return (row != null) && row.getColumns().size() > 0;
+ }
+ }
+
+ /**
+ * Slices rows and columns using tokens. Recall that the partitioner turns
+ * keys into tokens. For instance, under RandomPartitioner, tokens are the
+ * MD5 hashes of keys.
+ */
+ public class AbstractBufferedRowIter implements KeyIterator {
+
+ private final int pageSize;
+ private final SliceQuery columnSlice;
+
+ private boolean isClosed;
+ private boolean seenEnd;
+ protected Iterator<KeySlice> ksIter;
+ private KeySlice mostRecentRow;
+
+ private final IPartitioner partitioner;
+ private Token nextStartToken;
+ private final Token endToken;
+ private ByteBuffer nextStartKey;
+
+ private boolean omitEndToken;
+
+ public AbstractBufferedRowIter(IPartitioner partitioner,
+ SliceQuery columnSlice, int pageSize, Token startToken, Token endToken, boolean omitEndToken) {
+ this.pageSize = pageSize;
+ this.partitioner = partitioner;
+ this.nextStartToken = startToken;
+ this.endToken = endToken;
+ this.columnSlice = columnSlice;
+
+ this.seenEnd = false;
+ this.isClosed = false;
+ this.ksIter = Iterators.emptyIterator();
+ this.mostRecentRow = null;
+ this.omitEndToken = omitEndToken;
+ }
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+
+ if (!ksIter.hasNext() && !seenEnd) {
+ try {
+ ksIter = rebuffer().iterator();
+ } catch (BackendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return ksIter.hasNext();
+ }
+
+ @Override
+ public StaticBuffer next() {
+ ensureOpen();
+
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ mostRecentRow = ksIter.next();
+
+ Preconditions.checkNotNull(mostRecentRow);
+ return StaticArrayBuffer.of(mostRecentRow.bufferForKey());
+ }
+
+ @Override
+ public void close() {
+ closeIterator();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RecordIterator<Entry> getEntries() {
+ ensureOpen();
+
+ return new RecordIterator<Entry>() {
+ final Iterator<Entry> columns =
+ CassandraHelper.makeEntryIterator(mostRecentRow.getColumns(),
+ entryGetter, columnSlice.getSliceEnd(),
+ columnSlice.getLimit());
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+ return columns.hasNext();
+ }
+
+ @Override
+ public Entry next() {
+ ensureOpen();
+ return columns.next();
+ }
+
+ @Override
+ public void close() {
+ closeIterator();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private void ensureOpen() {
+ if (isClosed)
+ throw new IllegalStateException("Iterator has been closed.");
+ }
+
+ private void closeIterator() {
+ if (!isClosed) {
+ isClosed = true;
+ }
+ }
+
+ private List<KeySlice> rebuffer() throws BackendException {
+
+ Preconditions.checkArgument(!seenEnd);
+
+ return checkFreshSlices(getNextKeySlices());
+ }
+
+ protected List<KeySlice> checkFreshSlices(List<KeySlice> ks) {
+
+ if (0 == ks.size()) {
+ seenEnd = true;
+ return Collections.emptyList();
+ }
+
+ nextStartKey = ks.get(ks.size() - 1).bufferForKey();
+ nextStartToken = partitioner.getToken(nextStartKey);
+
+ if (nextStartToken.equals(endToken)) {
+ seenEnd = true;
+ if (omitEndToken)
+ ks.remove(ks.size() - 1);
+ }
+
+ return ks;
+ }
+
+ protected final List<KeySlice> getNextKeySlices() throws BackendException {
+ return getTokenSlice(nextStartToken, endToken, columnSlice, pageSize);
+ }
+ }
+
+ private final class AllTokensIterator extends AbstractBufferedRowIter {
+ public AllTokensIterator(IPartitioner partitioner, SliceQuery columnSlice, int pageSize) {
+ super(partitioner, columnSlice, pageSize, partitioner.getMinimumToken(), partitioner.getMinimumToken(), false);
+ }
+ }
+
+ private final class KeyRangeIterator extends AbstractBufferedRowIter {
+ public KeyRangeIterator(IPartitioner partitioner, SliceQuery columnSlice,
+ int pageSize, ByteBuffer startKey, ByteBuffer endKey) throws BackendException {
+ super(partitioner, columnSlice, pageSize, partitioner.getToken(startKey), partitioner.getToken(endKey), true);
+
+ Preconditions.checkArgument(partitioner instanceof AbstractByteOrderedPartitioner);
+
+ // Get first slice with key range instead of token range. Token
+ // ranges are start-exclusive, key ranges are start-inclusive. Both
+ // are end-inclusive. If we don't make the call below, then we will
+ // erroneously miss startKey.
+ List<KeySlice> ks = getKeySlice(startKey, endKey, columnSlice, pageSize);
+
+ this.ksIter = checkFreshSlices(ks).iterator();
+ }
+ }
+}
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java
new file mode 100644
index 0000000..d904860
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java
@@ -0,0 +1,621 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.thinkaurelius.titan.diskstorage.EntryMetaData;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
+import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions;
+import com.thinkaurelius.titan.util.system.NetworkUtil;
+
+import org.apache.cassandra.dht.AbstractByteOrderedPartitioner;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionPool;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+
+import static com.thinkaurelius.titan.diskstorage.configuration.ConfigOption.disallowEmpty;
+
+/**
+ * This class creates {@see CassandraThriftKeyColumnValueStore}s and
+ * handles Cassandra-backed allocation of vertex IDs for Titan (when so
+ * configured).
+ *
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ */
+@PreInitializeConfigOptions
+public class CassandraThriftStoreManager extends AbstractCassandraStoreManager {
+
+ public enum PoolExhaustedAction {
+ BLOCK(GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK),
+ FAIL(GenericKeyedObjectPool.WHEN_EXHAUSTED_FAIL),
+ GROW(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW);
+
+ private final byte b;
+
+ PoolExhaustedAction(byte b) {
+ this.b = b;
+ }
+
+ public byte getByte() {
+ return b;
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraThriftStoreManager.class);
+
+ public static final ConfigNamespace THRIFT_NS =
+ new ConfigNamespace(AbstractCassandraStoreManager.CASSANDRA_NS, "thrift",
+ "Options for Titan's own Thrift Cassandra backend");
+
+ public static final ConfigNamespace CPOOL_NS =
+ new ConfigNamespace(THRIFT_NS, "cpool", "Options for the Apache commons-pool connection manager");
+
+ public static final ConfigOption<String> CPOOL_WHEN_EXHAUSTED =
+ new ConfigOption<>(CPOOL_NS, "when-exhausted",
+ "What to do when clients concurrently request more active connections than are allowed " +
+ "by the pool. The value must be one of BLOCK, FAIL, or GROW.",
+ ConfigOption.Type.MASKABLE, String.class, PoolExhaustedAction.BLOCK.toString(),
+ disallowEmpty(String.class));
+
+ public static final ConfigOption<Integer> CPOOL_MAX_TOTAL =
+ new ConfigOption<Integer>(CPOOL_NS, "max-total",
+ "Max number of allowed Thrift connections, idle or active (-1 to leave undefined)",
+ ConfigOption.Type.MASKABLE, -1);
+
+ public static final ConfigOption<Integer> CPOOL_MAX_ACTIVE =
+ new ConfigOption<Integer>(CPOOL_NS, "max-active",
+ "Maximum number of concurrently in-use connections (-1 to leave undefined)",
+ ConfigOption.Type.MASKABLE, 16);
+
+ public static final ConfigOption<Integer> CPOOL_MAX_IDLE =
+ new ConfigOption<Integer>(CPOOL_NS, "max-idle",
+ "Maximum number of concurrently idle connections (-1 to leave undefined)",
+ ConfigOption.Type.MASKABLE, 4);
+
+ public static final ConfigOption<Integer> CPOOL_MIN_IDLE =
+ new ConfigOption<Integer>(CPOOL_NS, "min-idle",
+ "Minimum number of idle connections the pool attempts to maintain",
+ ConfigOption.Type.MASKABLE, 0);
+
+ // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
+ public static final ConfigOption<Long> CPOOL_MAX_WAIT =
+ new ConfigOption<Long>(CPOOL_NS, "max-wait",
+ "Maximum number of milliseconds to block when " + ConfigElement.getPath(CPOOL_WHEN_EXHAUSTED) +
+ " is set to BLOCK. Has no effect when set to actions besides BLOCK. Set to -1 to wait indefinitely.",
+ ConfigOption.Type.MASKABLE, -1L);
+
+ // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
+ public static final ConfigOption<Long> CPOOL_EVICTOR_PERIOD =
+ new ConfigOption<Long>(CPOOL_NS, "evictor-period",
+ "Approximate number of milliseconds between runs of the idle connection evictor. " +
+ "Set to -1 to never run the idle connection evictor.",
+ ConfigOption.Type.MASKABLE, 30L * 1000L);
+
+ // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration
+ public static final ConfigOption<Long> CPOOL_MIN_EVICTABLE_IDLE_TIME =
+ new ConfigOption<Long>(CPOOL_NS, "min-evictable-idle-time",
+ "Minimum number of milliseconds a connection must be idle before it is eligible for " +
+ "eviction. See also " + ConfigElement.getPath(CPOOL_EVICTOR_PERIOD) + ". Set to -1 to never evict " +
+ "idle connections.", ConfigOption.Type.MASKABLE, 60L * 1000L);
+
+ public static final ConfigOption<Boolean> CPOOL_IDLE_TESTS =
+ new ConfigOption<Boolean>(CPOOL_NS, "idle-test",
+ "Whether the idle connection evictor validates idle connections and drops those that fail to validate",
+ ConfigOption.Type.MASKABLE, false);
+
+ public static final ConfigOption<Integer> CPOOL_IDLE_TESTS_PER_EVICTION_RUN =
+ new ConfigOption<Integer>(CPOOL_NS, "idle-tests-per-eviction-run",
+ "When the value is negative, e.g. -n, roughly one nth of the idle connections are tested per run. " +
+ "When the value is positive, e.g. n, the min(idle-count, n) connections are tested per run.",
+ ConfigOption.Type.MASKABLE, 0);
+
+
+ private final Map<String, CassandraThriftKeyColumnValueStore> openStores;
+ private final CTConnectionPool pool;
+ private final Deployment deployment;
+
+ public CassandraThriftStoreManager(Configuration config) throws BackendException {
+ super(config);
+
+ /*
+ * This is eventually passed to Thrift's TSocket constructor. The
+ * constructor parameter is of type int.
+ */
+ int thriftTimeoutMS = (int)config.get(GraphDatabaseConfiguration.CONNECTION_TIMEOUT).toMillis();
+
+ CTConnectionFactory.Config factoryConfig = new CTConnectionFactory.Config(hostnames, port, username, password)
+ .setTimeoutMS(thriftTimeoutMS)
+ .setFrameSize(thriftFrameSizeBytes);
+
+ if (config.get(SSL_ENABLED)) {
+ factoryConfig.setSSLTruststoreLocation(config.get(SSL_TRUSTSTORE_LOCATION));
+ factoryConfig.setSSLTruststorePassword(config.get(SSL_TRUSTSTORE_PASSWORD));
+ }
+
+ final PoolExhaustedAction poolExhaustedAction = ConfigOption.getEnumValue(
+ config.get(CPOOL_WHEN_EXHAUSTED), PoolExhaustedAction.class);
+
+ CTConnectionPool p = new CTConnectionPool(factoryConfig.build());
+ p.setTestOnBorrow(true);
+ p.setTestOnReturn(true);
+ p.setTestWhileIdle(config.get(CPOOL_IDLE_TESTS));
+ p.setNumTestsPerEvictionRun(config.get(CPOOL_IDLE_TESTS_PER_EVICTION_RUN));
+ p.setWhenExhaustedAction(poolExhaustedAction.getByte());
+ p.setMaxActive(config.get(CPOOL_MAX_ACTIVE));
+ p.setMaxTotal(config.get(CPOOL_MAX_TOTAL)); // maxTotal limits active + idle
+ p.setMaxIdle(config.get(CPOOL_MAX_IDLE));
+ p.setMinIdle(config.get(CPOOL_MIN_IDLE));
+ p.setMaxWait(config.get(CPOOL_MAX_WAIT));
+ p.setTimeBetweenEvictionRunsMillis(config.get(CPOOL_EVICTOR_PERIOD));
+ p.setMinEvictableIdleTimeMillis(config.get(CPOOL_MIN_EVICTABLE_IDLE_TIME));
+ this.pool = p;
+
+ this.openStores = new HashMap<String, CassandraThriftKeyColumnValueStore>();
+
+ // Only watch the ring and change endpoints with BOP
+ if (getCassandraPartitioner() instanceof ByteOrderedPartitioner) {
+ deployment = (hostnames.length == 1)// mark deployment as local only in case we have byte ordered partitioner and local connection
+ ? (NetworkUtil.isLocalConnection(hostnames[0])) ? Deployment.LOCAL : Deployment.REMOTE
+ : Deployment.REMOTE;
+ } else {
+ deployment = Deployment.REMOTE;
+ }
+ }
+
+ @Override
+ public Deployment getDeployment() {
+ return deployment;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public IPartitioner getCassandraPartitioner() throws BackendException {
+ CTConnection conn = null;
+ try {
+ conn = pool.borrowObject(SYSTEM_KS);
+ return FBUtilities.newPartitioner(conn.getClient().describe_partitioner());
+ } catch (Exception e) {
+ throw new TemporaryBackendException(e);
+ } finally {
+ pool.returnObjectUnsafe(SYSTEM_KS, conn);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "thriftCassandra" + super.toString();
+ }
+
+ @Override
+ public void close() throws BackendException {
+ openStores.clear();
+ closePool();
+ }
+
+ @Override
+ public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
+ Preconditions.checkNotNull(mutations);
+
+ final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
+
+ ConsistencyLevel consistency = getTx(txh).getWriteConsistencyLevel().getThrift();
+
+ // Generate Thrift-compatible batch_mutate() datastructure
+ // key -> cf -> cassmutation
+ int size = 0;
+ for (Map<StaticBuffer, KCVMutation> mutation : mutations.values()) size += mutation.size();
+ Map<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch =
+ new HashMap<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>>(size);
+
+
+ for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> keyMutation : mutations.entrySet()) {
+ String columnFamily = keyMutation.getKey();
+ for (Map.Entry<StaticBuffer, KCVMutation> mutEntry : keyMutation.getValue().entrySet()) {
+ ByteBuffer keyBB = mutEntry.getKey().asByteBuffer();
+
+ // Get or create the single Cassandra Mutation object responsible for this key
+ Map<String, List<org.apache.cassandra.thrift.Mutation>> cfmutation = batch.get(keyBB);
+ if (cfmutation == null) {
+ cfmutation = new HashMap<String, List<org.apache.cassandra.thrift.Mutation>>(3); // Most mutations only modify the edgeStore and indexStore
+ batch.put(keyBB, cfmutation);
+ }
+
+ KCVMutation mutation = mutEntry.getValue();
+ List<org.apache.cassandra.thrift.Mutation> thriftMutation =
+ new ArrayList<org.apache.cassandra.thrift.Mutation>(mutations.size());
+
+ if (mutation.hasDeletions()) {
+ for (StaticBuffer buf : mutation.getDeletions()) {
+ Deletion d = new Deletion();
+ SlicePredicate sp = new SlicePredicate();
+ sp.addToColumn_names(buf.as(StaticBuffer.BB_FACTORY));
+ d.setPredicate(sp);
+ d.setTimestamp(commitTime.getDeletionTime(times));
+ org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation();
+ m.setDeletion(d);
+ thriftMutation.add(m);
+ }
+ }
+
+ if (mutation.hasAdditions()) {
+ for (Entry ent : mutation.getAdditions()) {
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ Column column = new Column(ent.getColumnAs(StaticBuffer.BB_FACTORY));
+ column.setValue(ent.getValueAs(StaticBuffer.BB_FACTORY));
+
+ column.setTimestamp(commitTime.getAdditionTime(times));
+
+ Integer ttl = (Integer) ent.getMetaData().get(EntryMetaData.TTL);
+ if (null != ttl && ttl > 0) {
+ column.setTtl(ttl);
+ }
+
+ cosc.setColumn(column);
+ org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation();
+ m.setColumn_or_supercolumn(cosc);
+ thriftMutation.add(m);
+ }
+ }
+
+ cfmutation.put(columnFamily, thriftMutation);
+ }
+ }
+
+ CTConnection conn = null;
+ try {
+ conn = pool.borrowObject(keySpaceName);
+ Cassandra.Client client = conn.getClient();
+ if (atomicBatch) {
+ client.atomic_batch_mutate(batch, consistency);
+ } else {
+ client.batch_mutate(batch, consistency);
+ }
+ } catch (Exception ex) {
+ throw CassandraThriftKeyColumnValueStore.convertException(ex);
+ } finally {
+ pool.returnObjectUnsafe(keySpaceName, conn);
+ }
+
+ sleepAfterWrite(txh, commitTime);
+ }
+
+ @Override // TODO: *BIG FAT WARNING* 'synchronized is always *bad*, change openStores to use ConcurrentLinkedHashMap
+ public synchronized CassandraThriftKeyColumnValueStore openDatabase(final String name, StoreMetaData.Container metaData) throws BackendException {
+ if (openStores.containsKey(name))
+ return openStores.get(name);
+
+ ensureColumnFamilyExists(keySpaceName, name);
+
+ CassandraThriftKeyColumnValueStore store = new CassandraThriftKeyColumnValueStore(keySpaceName, name, this, pool);
+ openStores.put(name, store);
+ return store;
+ }
+
+ @Override
+ public List<KeyRange> getLocalKeyPartition() throws BackendException {
+ CTConnection conn = null;
+ IPartitioner partitioner = getCassandraPartitioner();
+
+ if (!(partitioner instanceof AbstractByteOrderedPartitioner))
+ throw new UnsupportedOperationException("getLocalKeyPartition() only supported by byte ordered partitioner.");
+
+ Token.TokenFactory tokenFactory = partitioner.getTokenFactory();
+
+ try {
+ // Resist the temptation to describe SYSTEM_KS. It has no ring.
+ // Instead, we'll create our own keyspace (or check that it exists), then describe it.
+ ensureKeyspaceExists(keySpaceName);
+
+ conn = pool.borrowObject(keySpaceName);
+ List<TokenRange> ranges = conn.getClient().describe_ring(keySpaceName);
+ List<KeyRange> keyRanges = new ArrayList<KeyRange>(ranges.size());
+
+ for (TokenRange range : ranges) {
+ if (!NetworkUtil.hasLocalAddress(range.endpoints))
+ continue;
+
+ keyRanges.add(CassandraHelper.transformRange(tokenFactory.fromString(range.start_token), tokenFactory.fromString(range.end_token)));
+ }
+
+ return keyRanges;
+ } catch (Exception e) {
+ throw CassandraThriftKeyColumnValueStore.convertException(e);
+ } finally {
+ pool.returnObjectUnsafe(keySpaceName, conn);
+ }
+ }
+
+ /**
+ * Connect to Cassandra via Thrift on the specified host and port and attempt to truncate the named keyspace.
+ * <p/>
+ * This is a utility method intended mainly for testing. It is
+ * equivalent to issuing 'truncate <cf>' for each of the column families in keyspace using
+ * the cassandra-cli tool.
+ * <p/>
+ * Using truncate is better for a number of reasons, most significantly because it doesn't
+ * involve any schema modifications which can take time to propagate across the cluster such
+ * leaves nodes in the inconsistent state and could result in read/write failures.
+ * Any schema modifications are discouraged until there is no traffic to Keyspace or ColumnFamilies.
+ *
+ * @throws com.thinkaurelius.titan.diskstorage.BackendException if any checked Thrift or UnknownHostException is thrown in the body of this method
+ */
+ public void clearStorage() throws BackendException {
+ openStores.clear();
+ final String lp = "ClearStorage: "; // "log prefix"
+ /*
+ * log4j is capable of automatically writing the name of a method that
+ * generated a log message, but the docs warn that "generating caller
+ * location information is extremely slow and should be avoided unless
+ * execution speed is not an issue."
+ */
+
+ CTConnection conn = null;
+ try {
+ conn = pool.borrowObject(SYSTEM_KS);
+ Cassandra.Client client = conn.getClient();
+
+ KsDef ksDef;
+ try {
+ client.set_keyspace(keySpaceName);
+ ksDef = client.describe_keyspace(keySpaceName);
+ } catch (NotFoundException e) {
+ log.debug(lp + "Keyspace {} does not exist, not attempting to truncate.", keySpaceName);
+ return;
+ } catch (InvalidRequestException e) {
+ log.debug(lp + "InvalidRequestException when attempting to describe keyspace {}, not attempting to truncate.", keySpaceName);
+ return;
+ }
+
+
+ if (null == ksDef) {
+ log.debug(lp + "Received null KsDef for keyspace {}; not truncating its CFs", keySpaceName);
+ return;
+ }
+
+ List<CfDef> cfDefs = ksDef.getCf_defs();
+
+ if (null == cfDefs) {
+ log.debug(lp + "Received empty CfDef list for keyspace {}; not truncating CFs", keySpaceName);
+ return;
+ }
+
+ for (CfDef cfDef : ksDef.getCf_defs()) {
+ client.truncate(cfDef.name);
+ log.info(lp + "Truncated CF {} in keyspace {}", cfDef.name, keySpaceName);
+ }
+
+ /*
+ * Clearing the CTConnectionPool is unnecessary. This method
+ * removes no keyspaces. All open Cassandra connections will
+ * remain valid.
+ */
+ } catch (Exception e) {
+ throw new TemporaryBackendException(e);
+ } finally {
+ if (conn != null && conn.getClient() != null) {
+ try {
+ conn.getClient().set_keyspace(SYSTEM_KS);
+ } catch (InvalidRequestException e) {
+ log.warn("Failed to reset keyspace", e);
+ } catch (TException e) {
+ log.warn("Failed to reset keyspace", e);
+ }
+ }
+ pool.returnObjectUnsafe(SYSTEM_KS, conn);
+ }
+ }
+
+ private KsDef ensureKeyspaceExists(String keyspaceName) throws TException, BackendException {
+ CTConnection connection = null;
+
+ try {
+ connection = pool.borrowObject(SYSTEM_KS);
+ Cassandra.Client client = connection.getClient();
+
+ try {
+ // Side effect: throws Exception if keyspaceName doesn't exist
+ client.set_keyspace(keyspaceName); // Don't remove
+ client.set_keyspace(SYSTEM_KS);
+ log.debug("Found existing keyspace {}", keyspaceName);
+ } catch (InvalidRequestException e) {
+ // Keyspace didn't exist; create it
+ log.debug("Creating keyspace {}...", keyspaceName);
+
+ KsDef ksdef = new KsDef().setName(keyspaceName)
+ .setCf_defs(new LinkedList<CfDef>()) // cannot be null but can be empty
+ .setStrategy_class(storageConfig.get(REPLICATION_STRATEGY))
+ .setStrategy_options(strategyOptions);
+
+ client.set_keyspace(SYSTEM_KS);
+ try {
+ client.system_add_keyspace(ksdef);
+ retrySetKeyspace(keyspaceName, client);
+ log.debug("Created keyspace {}", keyspaceName);
+ } catch (InvalidRequestException ire) {
+ log.error("system_add_keyspace failed for keyspace=" + keyspaceName, ire);
+ throw ire;
+ }
+
+ }
+
+ return client.describe_keyspace(keyspaceName);
+ } catch (Exception e) {
+ throw new TemporaryBackendException(e);
+ } finally {
+ pool.returnObjectUnsafe(SYSTEM_KS, connection);
+ }
+ }
+
+ private void retrySetKeyspace(String ksName, Cassandra.Client client) throws BackendException {
+ final long end = System.currentTimeMillis() + (60L * 1000L);
+
+ while (System.currentTimeMillis() <= end) {
+ try {
+ client.set_keyspace(ksName);
+ return;
+ } catch (Exception e) {
+ log.warn("Exception when changing to keyspace {} after creating it", ksName, e);
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException ie) {
+ throw new PermanentBackendException("Unexpected interrupt (shutting down?)", ie);
+ }
+ }
+ }
+
+ throw new PermanentBackendException("Could change to keyspace " + ksName + " after creating it");
+ }
+
+ private void ensureColumnFamilyExists(String ksName, String cfName) throws BackendException {
+ ensureColumnFamilyExists(ksName, cfName, "org.apache.cassandra.db.marshal.BytesType");
+ }
+
+ private void ensureColumnFamilyExists(String ksName, String cfName, String comparator) throws BackendException {
+ CTConnection conn = null;
+ try {
+ KsDef keyspaceDef = ensureKeyspaceExists(ksName);
+
+ conn = pool.borrowObject(ksName);
+ Cassandra.Client client = conn.getClient();
+
+ log.debug("Looking up metadata on keyspace {}...", ksName);
+
+ boolean foundColumnFamily = false;
+ for (CfDef cfDef : keyspaceDef.getCf_defs()) {
+ String curCfName = cfDef.getName();
+ if (curCfName.equals(cfName))
+ foundColumnFamily = true;
+ }
+
+ if (!foundColumnFamily) {
+ createColumnFamily(client, ksName, cfName, comparator);
+ } else {
+ log.debug("Keyspace {} and ColumnFamily {} were found.", ksName, cfName);
+ }
+ } catch (SchemaDisagreementException e) {
+ throw new TemporaryBackendException(e);
+ } catch (Exception e) {
+ throw new PermanentBackendException(e);
+ } finally {
+ pool.returnObjectUnsafe(ksName, conn);
+ }
+ }
+
+ private void createColumnFamily(Cassandra.Client client,
+ String ksName,
+ String cfName,
+ String comparator) throws BackendException {
+
+ CfDef createColumnFamily = new CfDef();
+ createColumnFamily.setName(cfName);
+ createColumnFamily.setKeyspace(ksName);
+ createColumnFamily.setComparator_type(comparator);
+
+ ImmutableMap.Builder<String, String> compressionOptions = new ImmutableMap.Builder<String, String>();
+
+ if (compressionEnabled) {
+ compressionOptions.put("sstable_compression", compressionClass)
+ .put("chunk_length_kb", Integer.toString(compressionChunkSizeKB));
+ }
+
+ createColumnFamily.setCompression_options(compressionOptions.build());
+
+ // Hard-coded caching settings
+ if (cfName.startsWith(Backend.EDGESTORE_NAME)) {
+ createColumnFamily.setCaching("keys_only");
+ } else if (cfName.startsWith(Backend.INDEXSTORE_NAME)) {
+ createColumnFamily.setCaching("rows_only");
+ }
+
+ log.debug("Adding column family {} to keyspace {}...", cfName, ksName);
+ try {
+ client.system_add_column_family(createColumnFamily);
+ } catch (SchemaDisagreementException e) {
+ throw new TemporaryBackendException("Error in setting up column family", e);
+ } catch (Exception e) {
+ throw new PermanentBackendException(e);
+ }
+
+ log.debug("Added column family {} to keyspace {}.", cfName, ksName);
+ }
+
+ @Override
+ public Map<String, String> getCompressionOptions(String cf) throws BackendException {
+ CTConnection conn = null;
+ Map<String, String> result = null;
+
+ try {
+ conn = pool.borrowObject(keySpaceName);
+ Cassandra.Client client = conn.getClient();
+
+ KsDef ksDef = client.describe_keyspace(keySpaceName);
+
+ for (CfDef cfDef : ksDef.getCf_defs()) {
+ if (null != cfDef && cfDef.getName().equals(cf)) {
+ result = cfDef.getCompression_options();
+ break;
+ }
+ }
+
+ return result;
+ } catch (InvalidRequestException e) {
+ log.debug("Keyspace {} does not exist", keySpaceName);
+ return null;
+ } catch (Exception e) {
+ throw new TemporaryBackendException(e);
+ } finally {
+ pool.returnObjectUnsafe(keySpaceName, conn);
+ }
+ }
+
+ private void closePool() {
+ /*
+ * pool.close() does not affect borrowed connections.
+ *
+ * Connections currently borrowed by some thread which are
+ * talking to the old host will eventually be destroyed by
+ * CTConnectionFactory#validateObject() returning false when
+ * those connections are returned to the pool.
+ */
+ try {
+ pool.close();
+ log.info("Closed Thrift connection pooler.");
+ } catch (Exception e) {
+ log.warn("Failed to close connection pooler. "
+ + "We might be leaking Cassandra connections.", e);
+ // There's still hope: CTConnectionFactory#validateObject()
+ // will be called on borrow() and might tear down the
+ // connections that close() failed to tear down
+ }
+ }
+}
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnection.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnection.java
new file mode 100644
index 0000000..3657fb9
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnection.java
@@ -0,0 +1,55 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Cassandra.Client;
+import org.apache.thrift.transport.TTransport;
+
+import java.io.Closeable;
+
+/**
+ * Wraps a {@code Cassandra.Client} instance, its underlying {@code TTransport}
+ * instance, and the {@link com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory.Config} instance used to setup
+ * the connection.
+ *
+ * @see CTConnectionFactory
+ * @see CTConnectionPool
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ */
+public class CTConnection implements Closeable {
+
+ private final TTransport transport;
+ private final Cassandra.Client client;
+ private final CTConnectionFactory.Config cfg;
+
+ public CTConnection(TTransport transport, Client client, CTConnectionFactory.Config cfg) {
+ this.transport = transport;
+ this.client = client;
+ this.cfg = cfg;
+ }
+
+ public TTransport getTransport() {
+ return transport;
+ }
+
+ public Cassandra.Client getClient() {
+ return client;
+ }
+
+ public CTConnectionFactory.Config getConfig() {
+ return cfg;
+ }
+
+ public boolean isOpen() {
+ return transport.isOpen();
+ }
+ @Override
+ public void close() {
+ if (transport != null && transport.isOpen())
+ transport.close();
+ }
+
+ @Override
+ public String toString() {
+ return "CTConnection [transport=" + transport + ", client=" + client + ", cfg=" + cfg + "]";
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java
new file mode 100644
index 0000000..1c60cfd
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java
@@ -0,0 +1,210 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool;
+
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.thrift.*;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A factory compatible with Apache commons-pool for Cassandra Thrift API
+ * connections.
+ *
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ */
+public class CTConnectionFactory implements KeyedPoolableObjectFactory<String, CTConnection> {
+
+ private static final Logger log = LoggerFactory.getLogger(CTConnectionFactory.class);
+ private static final long SCHEMA_WAIT_MAX = 5000L;
+ private static final long SCHEMA_WAIT_INCREMENT = 25L;
+
+ private final AtomicReference<Config> cfgRef;
+
+ private CTConnectionFactory(Config config) {
+ this.cfgRef = new AtomicReference<Config>(config);
+ }
+
+ @Override
+ public void activateObject(String key, CTConnection c) throws Exception {
+ // Do nothing, as in passivateObject
+ }
+
+ @Override
+ public void destroyObject(String key, CTConnection c) throws Exception {
+ TTransport t = c.getTransport();
+
+ if (t.isOpen()) {
+ t.close();
+ log.trace("Closed transport {}", t);
+ } else {
+ log.trace("Not closing transport {} (already closed)", t);
+ }
+ }
+
+ @Override
+ public CTConnection makeObject(String key) throws Exception {
+ CTConnection conn = makeRawConnection();
+ Cassandra.Client client = conn.getClient();
+ client.set_keyspace(key);
+
+ return conn;
+ }
+
+ /**
+ * Create a Cassandra-Thrift connection, but do not attempt to
+ * set a keyspace on the connection.
+ *
+ * @return A CTConnection ready to talk to a Cassandra cluster
+ * @throws TTransportException on any Thrift transport failure
+ */
+ public CTConnection makeRawConnection() throws TTransportException {
+ final Config cfg = cfgRef.get();
+
+ String hostname = cfg.getRandomHost();
+
+ log.debug("Creating TSocket({}, {}, {}, {}, {})", hostname, cfg.port, cfg.username, cfg.password, cfg.timeoutMS);
+
+ TSocket socket;
+ if (null != cfg.sslTruststoreLocation && !cfg.sslTruststoreLocation.isEmpty()) {
+ TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters() {{
+ setTrustStore(cfg.sslTruststoreLocation, cfg.sslTruststorePassword);
+ }};
+ socket = TSSLTransportFactory.getClientSocket(hostname, cfg.port, cfg.timeoutMS, params);
+ } else {
+ socket = new TSocket(hostname, cfg.port, cfg.timeoutMS);
+ }
+
+ TTransport transport = new TFramedTransport(socket, cfg.frameSize);
+ log.trace("Created transport {}", transport);
+ TBinaryProtocol protocol = new TBinaryProtocol(transport);
+ Cassandra.Client client = new Cassandra.Client(protocol);
+ if (!transport.isOpen()) {
+ transport.open();
+ }
+
+ if (cfg.username != null) {
+ Map<String, String> credentials = new HashMap<String, String>() {{
+ put(IAuthenticator.USERNAME_KEY, cfg.username);
+ put(IAuthenticator.PASSWORD_KEY, cfg.password);
+ }};
+
+ try {
+ client.login(new AuthenticationRequest(credentials));
+ } catch (Exception e) { // TTransportException will propagate authentication/authorization failure
+ throw new TTransportException(e);
+ }
+ }
+ return new CTConnection(transport, client, cfg);
+ }
+
+ @Override
+ public void passivateObject(String key, CTConnection o) throws Exception {
+ // Do nothing, as in activateObject
+ }
+
+ @Override
+ public boolean validateObject(String key, CTConnection c) {
+ Config curCfg = cfgRef.get();
+
+ boolean isSameConfig = c.getConfig().equals(curCfg);
+ if (log.isDebugEnabled()) {
+ if (isSameConfig) {
+ log.trace("Validated {} by configuration {}", c, curCfg);
+ } else {
+ log.trace("Rejected {}; current config is {}; rejected connection config is {}",
+ c, curCfg, c.getConfig());
+ }
+ }
+
+ return isSameConfig && c.isOpen();
+ }
+
+ public static class Config {
+
+ private final String[] hostnames;
+ private final int port;
+ private final String username;
+ private final String password;
+ private final Random random;
+
+ private int timeoutMS;
+ private int frameSize;
+
+ private String sslTruststoreLocation;
+ private String sslTruststorePassword;
+
+ private boolean isBuilt;
+
+ public Config(String[] hostnames, int port, String username, String password) {
+ this.hostnames = hostnames;
+ this.port = port;
+ this.username = username;
+ this.password = password;
+ this.random = new Random();
+ }
+
+ // TODO: we don't really need getters/setters here as all of the fields are final and immutable
+
+ public String getHostname() {
+ return hostnames[0];
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public String getRandomHost() {
+ return hostnames.length == 1 ? hostnames[0] : hostnames[random.nextInt(hostnames.length)];
+ }
+
+ public Config setTimeoutMS(int timeoutMS) {
+ checkIfAlreadyBuilt();
+ this.timeoutMS = timeoutMS;
+ return this;
+ }
+
+ public Config setFrameSize(int frameSize) {
+ checkIfAlreadyBuilt();
+ this.frameSize = frameSize;
+ return this;
+ }
+
+ public Config setSSLTruststoreLocation(String location) {
+ checkIfAlreadyBuilt();
+ this.sslTruststoreLocation = location;
+ return this;
+ }
+
+ public Config setSSLTruststorePassword(String password) {
+ checkIfAlreadyBuilt();
+ this.sslTruststorePassword = password;
+ return this;
+ }
+
+ public CTConnectionFactory build() {
+ isBuilt = true;
+ return new CTConnectionFactory(this);
+ }
+
+
+ public void checkIfAlreadyBuilt() {
+ if (isBuilt)
+ throw new IllegalStateException("Can't accept modifications when used with built factory.");
+ }
+
+ @Override
+ public String toString() {
+ return "Config[hostnames=" + StringUtils.join(hostnames, ',') + ", port=" + port
+ + ", timeoutMS=" + timeoutMS + ", frameSize=" + frameSize
+ + "]";
+ }
+ }
+
+}
+
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionPool.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionPool.java
new file mode 100644
index 0000000..1af5630
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionPool.java
@@ -0,0 +1,64 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class extends Apache Commons Pool's GenericKeyedObjectPool, adding
+ * two methods that support Java 5 generic type safety. However, a
+ * programmer can still cause RuntimeExceptions related to type errors
+ * by mixing calls to these additional methods with calls to the legacy
+ * "Object"-typed methods.
+ * <p/>
+ * <p/>
+ * Unfortunately, GenericKeyedObjectPool is not actually generic in the
+ * type-system sense. All of its methods are typed to Object, forcing the
+ * client programmer to sprinkle code with casts. This class centralizes
+ * that casting to a single method.
+ * <p/>
+ * <p/>
+ * As a corollary, this class is slightly less flexible than
+ * GenericKeyedObjectPool, as this class can only store keys and pooled
+ * objects each of a single type, whereas GenericKeyedObjectPool could
+ * theoretically contain heterogeneous types of each. However, I do not
+ * need the flexibility of heterogeneous types for pooling Thrift
+ * connections, the original work that precipitated writing this class.
+ *
+ * @param <K> Key type
+ * @param <V> Pooled object type
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ */
+public class CTConnectionPool extends GenericKeyedObjectPool<String, CTConnection> {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(CTConnectionPool.class);
+
+ public CTConnectionPool(KeyedPoolableObjectFactory<String, CTConnection> factory) {
+ super(factory);
+ }
+
+ /**
+ * If {@code conn} is non-null and is still open, then call
+ * {@link GenericKeyedObjectPool#returnObject(String, CTConnection),
+ * catching and logging and Exception that method might generate.
+ * This method does not emit any exceptions.
+ *
+ * @param keyspace The key of the pooled object being returned
+ * @param conn The pooled object being returned, or null to do nothing
+ */
+ public void returnObjectUnsafe(String keyspace, CTConnection conn) {
+ if (conn != null && conn.isOpen()) {
+ try {
+ returnObject(keyspace, conn);
+ } catch (Exception e) {
+ log.warn("Failed to return Cassandra connection to pool", e);
+ log.warn(
+ "Failure context: keyspace={}, pool={}, connection={}",
+ new Object[] { keyspace, this, conn });
+ }
+ }
+ }
+}
+
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraDaemonWrapper.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraDaemonWrapper.java
new file mode 100644
index 0000000..acd87db
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraDaemonWrapper.java
@@ -0,0 +1,87 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.utils;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class starts a Thrift CassandraDaemon inside the current JVM. This class
+ * supports testing and shouldn't be used in production.
+ *
+ * This class starts Cassandra on the first invocation of
+ * {@link CassandraDaemonWrapper#start(String)} in the life of the JVM.
+ * Invocations after the first have no effect except that they may log a
+ * warning.
+ *
+ * When the thread that first called {@code #start(String)} dies, a daemon
+ * thread returns from {@link Thread#join()} and kills all embedded Cassandra
+ * threads in the JVM.
+ *
+ * This class once supported consecutive, idempotent calls to start(String) so
+ * long as the argument was the same in each invocation. It also once used
+ * refcounting to kill Cassandra's non-daemon threads once stop() was called as
+ * many times as start(). Some of Cassandra's background threads and statics
+ * can't be easily reset to allow a restart inside the same JVM, so this was
+ * intended as a one-use thing. However, this interacts poorly with the new
+ * KCVConfiguration system in titan-core. When KCVConfiguration is in use, core
+ * starts and stops each backend at least twice in the course of opening a
+ * single database instance. So the old refcounting and killing approach is out.
+ *
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ */
+public class CassandraDaemonWrapper {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(CassandraDaemonWrapper.class);
+
+ private static String activeConfig;
+
+ private static boolean started;
+
+ public static synchronized void start(String config) {
+
+ if (started) {
+ if (null != config && !config.equals(activeConfig)) {
+ log.warn("Can't start in-process Cassandra instance " +
+ "with yaml path {} because an instance was " +
+ "previously started with yaml path {}",
+ config, activeConfig);
+ }
+
+ return;
+ }
+
+ started = true;
+
+ log.debug("Current working directory: {}", System.getProperty("user.dir"));
+
+ System.setProperty("cassandra.config", config);
+ // Prevent Cassandra from closing stdout/stderr streams
+ System.setProperty("cassandra-foreground", "yes");
+ // Prevent Cassandra from overwriting Log4J configuration
+ System.setProperty("log4j.defaultInitOverride", "false");
+
+ log.info("Starting cassandra with {}", config);
+
+ /*
+ * This main method doesn't block for any substantial length of time. It
+ * creates and starts threads and returns in relatively short order.
+ */
+ CassandraDaemon.main(new String[0]);
+
+ activeConfig = config;
+ }
+
+ public static synchronized boolean isStarted() {
+ return started;
+ }
+
+ public static void stop() {
+ // Do nothing
+ }
+}
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraHelper.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraHelper.java
new file mode 100644
index 0000000..f8bd071
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraHelper.java
@@ -0,0 +1,139 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.utils;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import com.thinkaurelius.titan.diskstorage.EntryList;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.Entry;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange;
+import com.thinkaurelius.titan.diskstorage.util.BufferUtil;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList;
+import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import javax.annotation.Nullable;
+
+public class CassandraHelper {
+
+ public static List<ByteBuffer> convert(List<StaticBuffer> keys) {
+ List<ByteBuffer> requestKeys = new ArrayList<ByteBuffer>(keys.size());
+ for (int i = 0; i < keys.size(); i++) {
+ requestKeys.add(keys.get(i).asByteBuffer());
+ }
+ return requestKeys;
+ }
+
+ /**
+ * Constructs an {@link EntryList} from the Iterable of entries while excluding the end slice
+ * (since the method contract states that the end slice is exclusive, yet Cassandra treats it as
+ * inclusive) and respecting the limit.
+ *
+ * @param entries
+ * @param getter
+ * @param lastColumn TODO: make this StaticBuffer so we can avoid the conversion and provide equals method
+ * @param limit
+ * @param <E>
+ * @return
+ */
+ public static<E> EntryList makeEntryList(final Iterable<E> entries,
+ final StaticArrayEntry.GetColVal<E,ByteBuffer> getter,
+ final StaticBuffer lastColumn, final int limit) {
+ return StaticArrayEntryList.ofByteBuffer(new Iterable<E>() {
+ @Override
+ public Iterator<E> iterator() {
+ return Iterators.filter(entries.iterator(),new FilterResultColumns<E>(lastColumn,limit,getter));
+ }
+ },getter);
+ }
+
+ private static class FilterResultColumns<E> implements Predicate<E> {
+
+ private int count = 0;
+
+ private final int limit;
+ private final StaticBuffer lastColumn;
+ private final StaticArrayEntry.GetColVal<E,ByteBuffer> getter;
+
+ private FilterResultColumns(StaticBuffer lastColumn, int limit, StaticArrayEntry.GetColVal<E, ByteBuffer> getter) {
+ this.limit = limit;
+ this.lastColumn = lastColumn;
+ this.getter = getter;
+ }
+
+ @Override
+ public boolean apply(@Nullable E e) {
+ assert e!=null;
+ if (count>=limit || BufferUtil.equals(lastColumn, getter.getColumn(e))) return false;
+ count++;
+ return true;
+ }
+
+ }
+
+ public static<E> Iterator<Entry> makeEntryIterator(final Iterable<E> entries,
+ final StaticArrayEntry.GetColVal<E,ByteBuffer> getter,
+ final StaticBuffer lastColumn, final int limit) {
+ return Iterators.transform(Iterators.filter(entries.iterator(),
+ new FilterResultColumns<E>(lastColumn, limit, getter)), new Function<E, Entry>() {
+ @Nullable
+ @Override
+ public Entry apply(@Nullable E e) {
+ return StaticArrayEntry.ofByteBuffer(e,getter);
+ }
+ });
+ }
+
+
+ public static KeyRange transformRange(Range<Token> range) {
+ return transformRange(range.left, range.right);
+ }
+
+ public static KeyRange transformRange(Token leftKeyExclusive, Token rightKeyInclusive) {
+ if (!(leftKeyExclusive instanceof BytesToken))
+ throw new UnsupportedOperationException();
+
+ // if left part is BytesToken, right part should be too, otherwise there is no sense in the ring
+ assert rightKeyInclusive instanceof BytesToken;
+
+ // l is exclusive, r is inclusive
+ BytesToken l = (BytesToken) leftKeyExclusive;
+ BytesToken r = (BytesToken) rightKeyInclusive;
+
+ byte[] leftTokenValue = l.getTokenValue();
+ byte[] rightTokenValue = r.getTokenValue();
+
+ Preconditions.checkArgument(leftTokenValue.length == rightTokenValue.length, "Tokens have unequal length");
+ int tokenLength = leftTokenValue.length;
+
+ byte[][] tokens = new byte[][]{leftTokenValue, rightTokenValue};
+ byte[][] plusOne = new byte[2][tokenLength];
+
+ for (int j = 0; j < 2; j++) {
+ boolean carry = true;
+ for (int i = tokenLength - 1; i >= 0; i--) {
+ byte b = tokens[j][i];
+ if (carry) {
+ b++;
+ carry = false;
+ }
+ if (b == 0) carry = true;
+ plusOne[j][i] = b;
+ }
+ }
+
+ StaticBuffer lb = StaticArrayBuffer.of(plusOne[0]);
+ StaticBuffer rb = StaticArrayBuffer.of(plusOne[1]);
+ Preconditions.checkArgument(lb.length() == tokenLength, lb.length());
+ Preconditions.checkArgument(rb.length() == tokenLength, rb.length());
+
+ return new KeyRange(lb, rb);
+ }
+}
diff --git a/src/main/java/org/apache/cassandra/thrift/TBinaryProtocol.java b/src/main/java/org/apache/cassandra/thrift/TBinaryProtocol.java
new file mode 100644
index 0000000..1bb8f1f
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/thrift/TBinaryProtocol.java
@@ -0,0 +1,21 @@
+package org.apache.cassandra.thrift;
+
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * This is necessary until Astyanax is updated to "officially" support Cassandra 2.0.x.
+ *
+ * The story is as follows: Cassandra 2.0.x moved to the new version of Thrift (0.9.x)
+ * where problem with TBinaryProtocol was fixed, so TBinaryProtocol class was removed as no longer necessary.
+ * Astyanax in it's current state still wants to use TBinaryProtocol bundled with Cassandra,
+ * so this class is essentially tricking it (Astyanax) into believing that class is still there.
+ *
+ * No other changes necessary to make Astyanax work with Cassandra 2.0.x because Thrift API is completely in-tact.
+ */
+
+@SuppressWarnings("unused")
+public class TBinaryProtocol extends org.apache.thrift.protocol.TBinaryProtocol {
+ public TBinaryProtocol(TTransport trans) {
+ super(trans);
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/CassandraStorageSetup.java b/src/test/java/com/thinkaurelius/titan/CassandraStorageSetup.java
new file mode 100644
index 0000000..3b70533
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/CassandraStorageSetup.java
@@ -0,0 +1,182 @@
+package com.thinkaurelius.titan;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.CASSANDRA_KEYSPACE;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
+
+import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraDaemonWrapper;
+import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.*;
+
+public class CassandraStorageSetup {
+
+ public static final String CONFDIR_SYSPROP = "test.cassandra.confdir";
+ public static final String DATADIR_SYSPROP = "test.cassandra.datadir";
+
+ private static volatile Paths paths;
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraStorageSetup.class);
+
+ private static synchronized Paths getPaths() {
+ if (null == paths) {
+ String yamlPath = "file://" + loadAbsoluteDirectoryPath("conf", CONFDIR_SYSPROP, true) + File.separator + "cassandra.yaml";
+ String dataPath = loadAbsoluteDirectoryPath("data", DATADIR_SYSPROP, false);
+ paths = new Paths(yamlPath, dataPath);
+ }
+ return paths;
+ }
+
+ private static ModifiableConfiguration getGenericConfiguration(String ks, String backend) {
+ ModifiableConfiguration config = buildGraphConfiguration();
+ config.set(CASSANDRA_KEYSPACE, cleanKeyspaceName(ks));
+ log.debug("Set keyspace name: {}", config.get(CASSANDRA_KEYSPACE));
+ config.set(PAGE_SIZE,500);
+ config.set(CONNECTION_TIMEOUT, Duration.ofSeconds(60L));
+ config.set(STORAGE_BACKEND, backend);
+ return config;
+ }
+
+
+ public static ModifiableConfiguration getEmbeddedConfiguration(String ks) {
+ ModifiableConfiguration config = getGenericConfiguration(ks, "embeddedcassandra");
+ config.set(STORAGE_CONF_FILE, getPaths().yamlPath);
+ return config;
+ }
+
+ public static ModifiableConfiguration getEmbeddedCassandraPartitionConfiguration(String ks) {
+ ModifiableConfiguration config = getEmbeddedConfiguration(ks);
+ config.set(IDS_FLUSH,false);
+ return config;
+ }
+
+ public static WriteConfiguration getEmbeddedGraphConfiguration(String ks) {
+ return getEmbeddedConfiguration(ks).getConfiguration();
+ }
+
+ public static WriteConfiguration getEmbeddedCassandraPartitionGraphConfiguration(String ks) {
+ return getEmbeddedConfiguration(ks).getConfiguration();
+ }
+
+ public static ModifiableConfiguration getAstyanaxConfiguration(String ks) {
+ return getGenericConfiguration(ks, "astyanax");
+ }
+
+ public static ModifiableConfiguration getAstyanaxSSLConfiguration(String ks) {
+ return enableSSL(getGenericConfiguration(ks, "astyanax"));
+ }
+
+ public static WriteConfiguration getAstyanaxGraphConfiguration(String ks) {
+ return getAstyanaxConfiguration(ks).getConfiguration();
+ }
+
+ public static ModifiableConfiguration getCassandraConfiguration(String ks) {
+ return getGenericConfiguration(ks, "cassandra");
+ }
+
+ public static WriteConfiguration getCassandraGraphConfiguration(String ks) {
+ return getCassandraConfiguration(ks).getConfiguration();
+ }
+
+ public static ModifiableConfiguration getCassandraThriftConfiguration(String ks) {
+ return getGenericConfiguration(ks, "cassandrathrift");
+ }
+
+ public static ModifiableConfiguration getCassandraThriftSSLConfiguration(String ks) {
+ return enableSSL(getGenericConfiguration(ks, "cassandrathrift"));
+ }
+
+ public static WriteConfiguration getCassandraThriftGraphConfiguration(String ks) {
+ return getCassandraThriftConfiguration(ks).getConfiguration();
+ }
+
+ /**
+ * Load cassandra.yaml and data paths from the environment or from default
+ * values if nothing is set in the environment, then delete all existing
+ * data, and finally start Cassandra.
+ * <p>
+ * This method is idempotent. Calls after the first have no effect aside
+ * from logging statements.
+ */
+ public static void startCleanEmbedded() {
+ startCleanEmbedded(getPaths());
+ }
+
+ /*
+ * Cassandra only accepts keyspace names 48 characters long or shorter made
+ * up of alphanumeric characters and underscores.
+ */
+ public static String cleanKeyspaceName(String raw) {
+ Preconditions.checkNotNull(raw);
+ Preconditions.checkArgument(0 < raw.length());
+
+ if (48 < raw.length() || raw.matches("[^a-zA-Z_]")) {
+ return "strhash" + String.valueOf(Math.abs(raw.hashCode()));
+ } else {
+ return raw;
+ }
+ }
+
+ private static ModifiableConfiguration enableSSL(ModifiableConfiguration mc) {
+ mc.set(AbstractCassandraStoreManager.SSL_ENABLED, true);
+ mc.set(STORAGE_HOSTS, new String[]{ "localhost" });
+ mc.set(AbstractCassandraStoreManager.SSL_TRUSTSTORE_LOCATION,
+ Joiner.on(File.separator).join("target", "cassandra", "conf", "localhost-murmur-ssl", "test.truststore"));
+ mc.set(AbstractCassandraStoreManager.SSL_TRUSTSTORE_PASSWORD, "cassandra");
+ return mc;
+ }
+
+ private static void startCleanEmbedded(Paths p) {
+ if (!CassandraDaemonWrapper.isStarted()) {
+ try {
+ FileUtils.deleteDirectory(new File(p.dataPath));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ CassandraDaemonWrapper.start(p.yamlPath);
+ }
+
+ private static String loadAbsoluteDirectoryPath(String name, String prop, boolean mustExistAndBeAbsolute) {
+ String s = System.getProperty(prop);
+
+ if (null == s) {
+ s = Joiner.on(File.separator).join(System.getProperty("user.dir"), "target", "cassandra", name, "localhost-bop");
+ log.info("Set default Cassandra {} directory path {}", name, s);
+ } else {
+ log.info("Loaded Cassandra {} directory path {} from system property {}", new Object[] { name, s, prop });
+ }
+
+ if (mustExistAndBeAbsolute) {
+ File dir = new File(s);
+ Preconditions.checkArgument(dir.isDirectory(), "Path %s must be a directory", s);
+ Preconditions.checkArgument(dir.isAbsolute(), "Path %s must be absolute", s);
+ }
+
+ return s;
+ }
+
+ private static class Paths {
+ private final String yamlPath;
+ private final String dataPath;
+
+ public Paths(String yamlPath, String dataPath) {
+ this.yamlPath = yamlPath;
+ this.dataPath = dataPath;
+ }
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphComputerProvider.java b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphComputerProvider.java
new file mode 100644
index 0000000..c77fed4
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphComputerProvider.java
@@ -0,0 +1,22 @@
+package com.thinkaurelius.titan.blueprints.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.blueprints.AbstractTitanGraphComputerProvider;
+import com.thinkaurelius.titan.blueprints.AbstractTitanGraphProvider;
+import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
+import com.thinkaurelius.titan.graphdb.olap.computer.FulgoraGraphComputer;
+import org.apache.tinkerpop.gremlin.GraphProvider;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+@GraphProvider.Descriptor(computer = FulgoraGraphComputer.class)
+public class ThriftGraphComputerProvider extends AbstractTitanGraphComputerProvider {
+
+ @Override
+ public ModifiableConfiguration getTitanConfiguration(String graphName, Class<?> test, String testMethodName) {
+ CassandraStorageSetup.startCleanEmbedded();
+ return CassandraStorageSetup.getCassandraThriftConfiguration(graphName);
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphProvider.java b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphProvider.java
new file mode 100644
index 0000000..ce8ee76
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphProvider.java
@@ -0,0 +1,18 @@
+package com.thinkaurelius.titan.blueprints.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.blueprints.AbstractTitanGraphProvider;
+import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public class ThriftGraphProvider extends AbstractTitanGraphProvider {
+
+ @Override
+ public ModifiableConfiguration getTitanConfiguration(String graphName, Class<?> test, String testMethodName) {
+ CassandraStorageSetup.startCleanEmbedded();
+ return CassandraStorageSetup.getCassandraThriftConfiguration(graphName);
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftComputerTest.java b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftComputerTest.java
new file mode 100644
index 0000000..3109b8a
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftComputerTest.java
@@ -0,0 +1,25 @@
+package com.thinkaurelius.titan.blueprints.thrift.process;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.blueprints.thrift.ThriftGraphComputerProvider;
+import com.thinkaurelius.titan.blueprints.thrift.ThriftGraphProvider;
+import com.thinkaurelius.titan.core.TitanGraph;
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+@RunWith(ProcessComputerSuite.class)
+@GraphProviderClass(provider = ThriftGraphComputerProvider.class, graph = TitanGraph.class)
+public class ThriftComputerTest {
+
+// TP3 ignores @BeforeClass -- the following method is never executed
+// @BeforeClass
+// public static void beforeSuite() {
+// CassandraStorageSetup.startCleanEmbedded();
+// }
+
+} \ No newline at end of file
diff --git a/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftProcessTest.java b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftProcessTest.java
new file mode 100644
index 0000000..0feb338
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftProcessTest.java
@@ -0,0 +1,25 @@
+package com.thinkaurelius.titan.blueprints.thrift.process;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.blueprints.thrift.ThriftGraphProvider;
+import com.thinkaurelius.titan.core.TitanGraph;
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite;
+import org.apache.tinkerpop.gremlin.structure.StructureStandardSuite;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+@RunWith(ProcessStandardSuite.class)
+@GraphProviderClass(provider = ThriftGraphProvider.class, graph = TitanGraph.class)
+public class ThriftProcessTest {
+
+// TP3 ignores @BeforeClass -- the following method is never executed
+// @BeforeClass
+// public static void beforeSuite() {
+// CassandraStorageSetup.startCleanEmbedded();
+// }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/blueprints/thrift/structure/ThriftStructureTest.java b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/structure/ThriftStructureTest.java
new file mode 100644
index 0000000..2d1d7eb
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/structure/ThriftStructureTest.java
@@ -0,0 +1,24 @@
+package com.thinkaurelius.titan.blueprints.thrift.structure;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.blueprints.thrift.ThriftGraphProvider;
+import com.thinkaurelius.titan.core.TitanGraph;
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.structure.StructureStandardSuite;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+@RunWith(StructureStandardSuite.class)
+@GraphProviderClass(provider = ThriftGraphProvider.class, graph = TitanGraph.class)
+public class ThriftStructureTest {
+
+// TP3 ignores @BeforeClass -- the following method is never executed
+// @BeforeClass
+// public static void beforeSuite() {
+// CassandraStorageSetup.startCleanEmbedded();
+// }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreTest.java
new file mode 100644
index 0000000..623a742
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreTest.java
@@ -0,0 +1,136 @@
+package com.thinkaurelius.titan.diskstorage.cassandra;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+import com.thinkaurelius.titan.diskstorage.KeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
+import com.thinkaurelius.titan.testcategory.OrderedKeyStoreTests;
+import com.thinkaurelius.titan.testcategory.UnorderedKeyStoreTests;
+
+public abstract class AbstractCassandraStoreTest extends KeyColumnValueStoreTest {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(AbstractCassandraStoreTest.class);
+ private static final String TEST_CF_NAME = "testcf";
+ private static final String DEFAULT_COMPRESSOR_PACKAGE = "org.apache.cassandra.io.compress";
+
+ public abstract ModifiableConfiguration getBaseStorageConfiguration();
+
+ public abstract AbstractCassandraStoreManager openStorageManager(Configuration c) throws BackendException;
+
+ @Test
+ @Category({ UnorderedKeyStoreTests.class })
+ public void testUnorderedConfiguration() {
+ if (!manager.getFeatures().hasUnorderedScan()) {
+ log.warn(
+ "Can't test key-unordered features on incompatible store. "
+ + "This warning could indicate reduced test coverage and "
+ + "a broken JUnit configuration. Skipping test {}.",
+ name.getMethodName());
+ return;
+ }
+
+ StoreFeatures features = manager.getFeatures();
+ assertFalse(features.isKeyOrdered());
+ assertFalse(features.hasLocalKeyPartition());
+ }
+
+ @Test
+ @Category({ OrderedKeyStoreTests.class })
+ public void testOrderedConfiguration() {
+ if (!manager.getFeatures().hasOrderedScan()) {
+ log.warn(
+ "Can't test key-ordered features on incompatible store. "
+ + "This warning could indicate reduced test coverage and "
+ + "a broken JUnit configuration. Skipping test {}.",
+ name.getMethodName());
+ return;
+ }
+
+ StoreFeatures features = manager.getFeatures();
+ assertTrue(features.isKeyOrdered());
+ }
+
+ @Test
+ public void testDefaultCFCompressor() throws BackendException {
+
+ final String cf = TEST_CF_NAME + "_snappy";
+
+ AbstractCassandraStoreManager mgr = openStorageManager();
+
+ mgr.openDatabase(cf);
+
+ Map<String, String> defaultCfCompressionOps =
+ new ImmutableMap.Builder<String, String>()
+ .put("sstable_compression", DEFAULT_COMPRESSOR_PACKAGE + "." + AbstractCassandraStoreManager.CF_COMPRESSION_TYPE.getDefaultValue())
+ .put("chunk_length_kb", "64")
+ .build();
+
+ assertEquals(defaultCfCompressionOps, mgr.getCompressionOptions(cf));
+ }
+
+ @Test
+ public void testCustomCFCompressor() throws BackendException {
+
+ final String cname = "DeflateCompressor";
+ final int ckb = 128;
+ final String cf = TEST_CF_NAME + "_gzip";
+
+ ModifiableConfiguration config = getBaseStorageConfiguration();
+ config.set(AbstractCassandraStoreManager.CF_COMPRESSION_TYPE,cname);
+ config.set(AbstractCassandraStoreManager.CF_COMPRESSION_BLOCK_SIZE,ckb);
+
+ AbstractCassandraStoreManager mgr = openStorageManager(config);
+
+ // N.B.: clearStorage() truncates CFs but does not delete them
+ mgr.openDatabase(cf);
+
+ final Map<String, String> expected = ImmutableMap
+ .<String, String> builder()
+ .put("sstable_compression",
+ DEFAULT_COMPRESSOR_PACKAGE + "." + cname)
+ .put("chunk_length_kb", String.valueOf(ckb)).build();
+
+ assertEquals(expected, mgr.getCompressionOptions(cf));
+ }
+
+ @Test
+ public void testDisableCFCompressor() throws BackendException {
+
+ final String cf = TEST_CF_NAME + "_nocompress";
+
+ ModifiableConfiguration config = getBaseStorageConfiguration();
+ config.set(AbstractCassandraStoreManager.CF_COMPRESSION,false);
+ AbstractCassandraStoreManager mgr = openStorageManager(config);
+
+ // N.B.: clearStorage() truncates CFs but does not delete them
+ mgr.openDatabase(cf);
+
+ assertEquals(Collections.emptyMap(), mgr.getCompressionOptions(cf));
+ }
+
+ @Test
+ public void testTTLSupported() throws Exception {
+ StoreFeatures features = manager.getFeatures();
+ assertTrue(features.hasCellTTL());
+ }
+
+ @Override
+ public AbstractCassandraStoreManager openStorageManager() throws BackendException {
+ return openStorageManager(getBaseStorageConfiguration());
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransactionTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransactionTest.java
new file mode 100644
index 0000000..c602f97
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransactionTest.java
@@ -0,0 +1,79 @@
+package com.thinkaurelius.titan.diskstorage.cassandra;
+
+import com.google.common.base.Preconditions;
+import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig;
+import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
+import com.thinkaurelius.titan.diskstorage.util.StandardBaseTransactionConfig;
+import com.thinkaurelius.titan.diskstorage.util.time.TimestampProviders;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import org.junit.Test;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.CASSANDRA_READ_CONSISTENCY;
+import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.CASSANDRA_WRITE_CONSISTENCY;
+import static org.junit.Assert.*;
+
+public class CassandraTransactionTest {
+
+ /* testRead/WriteConsistencyLevel have unnecessary code duplication
+ * that could be avoided by creating a common helper method that takes
+ * a ConfigOption parameter and a function that converts a
+ * CassandraTransaction to a consistency level by calling either
+ * ct.getReadConsistencyLevel() or .getWriteConsistencyLevel(),
+ * but it doesn't seem worth the complexity.
+ */
+
+ @Test
+ public void testWriteConsistencyLevel() {
+ int levelsChecked = 0;
+
+ // Test whether CassandraTransaction honors the write consistency level option
+ for (CLevel writeLevel : CLevel.values()) {
+ StandardBaseTransactionConfig.Builder b = new StandardBaseTransactionConfig.Builder();
+ ModifiableConfiguration mc = GraphDatabaseConfiguration.buildGraphConfiguration();
+ mc.set(CASSANDRA_WRITE_CONSISTENCY, writeLevel.name());
+ b.customOptions(mc);
+ b.timestampProvider(TimestampProviders.MICRO);
+ CassandraTransaction ct = new CassandraTransaction(b.build());
+ assertEquals(writeLevel, ct.getWriteConsistencyLevel());
+ levelsChecked++;
+ }
+
+ // Sanity check: if CLevel.values was empty, something is wrong with the test
+ Preconditions.checkState(0 < levelsChecked);
+ }
+
+ @Test
+ public void testReadConsistencyLevel() {
+ int levelsChecked = 0;
+
+ // Test whether CassandraTransaction honors the write consistency level option
+ for (CLevel writeLevel : CLevel.values()) {
+ StandardBaseTransactionConfig.Builder b = new StandardBaseTransactionConfig.Builder();
+ ModifiableConfiguration mc = GraphDatabaseConfiguration.buildGraphConfiguration();
+ mc.set(CASSANDRA_READ_CONSISTENCY, writeLevel.name());
+ b.timestampProvider(TimestampProviders.MICRO);
+ b.customOptions(mc);
+ CassandraTransaction ct = new CassandraTransaction(b.build());
+ assertEquals(writeLevel, ct.getReadConsistencyLevel());
+ levelsChecked++;
+ }
+
+ // Sanity check: if CLevel.values was empty, something is wrong with the test
+ Preconditions.checkState(0 < levelsChecked);
+ }
+
+ @Test
+ public void testTimestampProvider() {
+ BaseTransactionConfig txcfg = StandardBaseTransactionConfig.of(TimestampProviders.NANO);
+ CassandraTransaction ct = new CassandraTransaction(txcfg);
+ assertEquals(TimestampProviders.NANO, ct.getConfiguration().getTimestampProvider());
+
+ txcfg = StandardBaseTransactionConfig.of(TimestampProviders.MICRO);
+ ct = new CassandraTransaction(txcfg);
+ assertEquals(TimestampProviders.MICRO, ct.getConfiguration().getTimestampProvider());
+
+ txcfg = StandardBaseTransactionConfig.of(TimestampProviders.MILLI);
+ ct = new CassandraTransaction(txcfg);
+ assertEquals(TimestampProviders.MILLI, ct.getConfiguration().getTimestampProvider());
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/UUIDTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/UUIDTest.java
new file mode 100644
index 0000000..356e8ce
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/UUIDTest.java
@@ -0,0 +1,34 @@
+package com.thinkaurelius.titan.diskstorage.cassandra;
+
+import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.thinkaurelius.titan.testcategory.StandaloneTests;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({StandaloneTests.class})
+public class UUIDTest {
+ public static final String z = "00000000-0000-1000-0000-000000000000";
+ public static final String v = "9451e273-7753-11e0-92df-e700f669bcfc";
+
+ @Test
+ public void timeUUIDComparison() {
+ TimeUUIDType ti = TimeUUIDType.instance;
+
+ UUID zu = UUID.fromString(z);
+ UUID vu = UUID.fromString(v);
+
+ ByteBuffer zb = ti.decompose(zu);
+ ByteBuffer vb = ti.decompose(vu);
+
+ assertEquals(-1, ti.compare(zb, vb));
+ assertEquals(1, zu.compareTo(vu));
+ assertEquals(1, ti.compare(vb, zb));
+ assertEquals(-1, vu.compareTo(zu));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxIDAuthorityTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxIDAuthorityTest.java
new file mode 100644
index 0000000..662b60a
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxIDAuthorityTest.java
@@ -0,0 +1,25 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.IDAuthorityTest;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import org.junit.BeforeClass;
+
+public class AstyanaxIDAuthorityTest extends IDAuthorityTest {
+
+ public AstyanaxIDAuthorityTest(WriteConfiguration baseConfig) {
+ super(baseConfig);
+ }
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws BackendException {
+ return new AstyanaxStoreManager(CassandraStorageSetup.getAstyanaxConfiguration(getClass().getSimpleName()));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLockStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLockStoreTest.java
new file mode 100644
index 0000000..9e47088
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLockStoreTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import org.junit.BeforeClass;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+
+public class AstyanaxLockStoreTest extends LockKeyColumnValueStoreTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager(int idx) throws BackendException {
+ return new AstyanaxStoreManager(CassandraStorageSetup.getAstyanaxConfiguration(getClass().getSimpleName()));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLogTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLogTest.java
new file mode 100644
index 0000000..339261f
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLogTest.java
@@ -0,0 +1,25 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import com.thinkaurelius.titan.diskstorage.log.KCVSLogTest;
+import com.thinkaurelius.titan.testcategory.SerialTests;
+
+@Category(SerialTests.class)
+public class AstyanaxLogTest extends KCVSLogTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws BackendException {
+ return new AstyanaxStoreManager(CassandraStorageSetup.getAstyanaxConfiguration(getClass().getSimpleName()));
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxMultiWriteStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxMultiWriteStoreTest.java
new file mode 100644
index 0000000..6de8238
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxMultiWriteStoreTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import org.junit.BeforeClass;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.MultiWriteKeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+
+public class AstyanaxMultiWriteStoreTest extends MultiWriteKeyColumnValueStoreTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws BackendException {
+ return new AstyanaxStoreManager(CassandraStorageSetup.getAstyanaxConfiguration(getClass().getSimpleName()));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxSSLStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxSSLStoreTest.java
new file mode 100644
index 0000000..48213b0
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxSSLStoreTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
+import com.thinkaurelius.titan.testcategory.CassandraSSLTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ CassandraSSLTests.class })
+public class AstyanaxSSLStoreTest extends AstyanaxStoreTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public ModifiableConfiguration getBaseStorageConfiguration() {
+ return CassandraStorageSetup.getAstyanaxSSLConfiguration(getClass().getSimpleName());
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreTest.java
new file mode 100644
index 0000000..3719ff6
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreTest.java
@@ -0,0 +1,28 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.astyanax;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
+import org.junit.BeforeClass;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreTest;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
+
+public class AstyanaxStoreTest extends AbstractCassandraStoreTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public ModifiableConfiguration getBaseStorageConfiguration() {
+ return CassandraStorageSetup.getAstyanaxConfiguration(getClass().getSimpleName());
+ }
+
+ @Override
+ public AbstractCassandraStoreManager openStorageManager(Configuration c) throws BackendException {
+ return new AstyanaxStoreManager(c);
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedIDAuthorityTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedIDAuthorityTest.java
new file mode 100644
index 0000000..252889a
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedIDAuthorityTest.java
@@ -0,0 +1,19 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.embedded;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.IDAuthorityTest;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+
+public class EmbeddedIDAuthorityTest extends IDAuthorityTest {
+
+ public EmbeddedIDAuthorityTest(WriteConfiguration baseConfig) {
+ super(baseConfig);
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws BackendException {
+ return new CassandraEmbeddedStoreManager(CassandraStorageSetup.getEmbeddedConfiguration(getClass().getSimpleName()));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLockStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLockStoreTest.java
new file mode 100644
index 0000000..960b802
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLockStoreTest.java
@@ -0,0 +1,14 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.embedded;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+
+public class EmbeddedLockStoreTest extends LockKeyColumnValueStoreTest {
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager(int idx) throws BackendException {
+ return new CassandraEmbeddedStoreManager(CassandraStorageSetup.getEmbeddedConfiguration(getClass().getSimpleName()));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLogTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLogTest.java
new file mode 100644
index 0000000..36436e7
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLogTest.java
@@ -0,0 +1,24 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.embedded;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import com.thinkaurelius.titan.diskstorage.log.KCVSLogTest;
+import com.thinkaurelius.titan.testcategory.SerialTests;
+
+@Category(SerialTests.class)
+public class EmbeddedLogTest extends KCVSLogTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws BackendException {
+ return new CassandraEmbeddedStoreManager(CassandraStorageSetup.getEmbeddedConfiguration(getClass().getSimpleName()));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedMultiWriteStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedMultiWriteStoreTest.java
new file mode 100644
index 0000000..d4608c9
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedMultiWriteStoreTest.java
@@ -0,0 +1,15 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.embedded;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.MultiWriteKeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+
+public class EmbeddedMultiWriteStoreTest extends MultiWriteKeyColumnValueStoreTest {
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws BackendException {
+ return new CassandraEmbeddedStoreManager(CassandraStorageSetup.getEmbeddedConfiguration(getClass().getSimpleName()));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedStoreTest.java
new file mode 100644
index 0000000..e2a9bc0
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedStoreTest.java
@@ -0,0 +1,42 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.embedded;
+
+import static org.junit.Assert.assertTrue;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreTest;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
+import com.thinkaurelius.titan.testcategory.OrderedKeyStoreTests;
+
+public class EmbeddedStoreTest extends AbstractCassandraStoreTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public ModifiableConfiguration getBaseStorageConfiguration() {
+ return CassandraStorageSetup.getEmbeddedConfiguration(getClass().getSimpleName());
+ }
+
+ @Override
+ public AbstractCassandraStoreManager openStorageManager(Configuration c) throws BackendException {
+ return new CassandraEmbeddedStoreManager(c);
+ }
+
+ @Test
+ @Category({ OrderedKeyStoreTests.class })
+ public void testConfiguration() {
+ StoreFeatures features = manager.getFeatures();
+ assertTrue(features.isKeyOrdered());
+ assertTrue(features.hasLocalKeyPartition());
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftDistributedStoreManagerTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftDistributedStoreManagerTest.java
new file mode 100644
index 0000000..9e16585
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftDistributedStoreManagerTest.java
@@ -0,0 +1,30 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.DistributedStoreManagerTest;
+
+public class ThriftDistributedStoreManagerTest extends DistributedStoreManagerTest<CassandraThriftStoreManager> {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Before
+ public void setUp() throws BackendException {
+ manager = new CassandraThriftStoreManager(
+ CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName()));
+ store = manager.openDatabase("distributedcf");
+ }
+
+ @After
+ public void tearDown() throws BackendException {
+ if (null != manager)
+ manager.close();
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftIDAuthorityTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftIDAuthorityTest.java
new file mode 100644
index 0000000..9e1881a
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftIDAuthorityTest.java
@@ -0,0 +1,25 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.IDAuthorityTest;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import org.junit.BeforeClass;
+
+public class ThriftIDAuthorityTest extends IDAuthorityTest {
+
+ public ThriftIDAuthorityTest(WriteConfiguration baseConfig) {
+ super(baseConfig);
+ }
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws BackendException {
+ return new CassandraThriftStoreManager(CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName()));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLockStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLockStoreTest.java
new file mode 100644
index 0000000..223d399
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLockStoreTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import org.junit.BeforeClass;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+
+public class ThriftLockStoreTest extends LockKeyColumnValueStoreTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager(int idx) throws BackendException {
+ return new CassandraThriftStoreManager(CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName()));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLogTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLogTest.java
new file mode 100644
index 0000000..7123b5f
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLogTest.java
@@ -0,0 +1,25 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import com.thinkaurelius.titan.diskstorage.log.KCVSLogTest;
+import com.thinkaurelius.titan.testcategory.SerialTests;
+
+@Category(SerialTests.class)
+public class ThriftLogTest extends KCVSLogTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws BackendException {
+ return new CassandraThriftStoreManager(CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName()));
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftMultiWriteStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftMultiWriteStoreTest.java
new file mode 100644
index 0000000..cbfce61
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftMultiWriteStoreTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.MultiWriteKeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+
+import org.junit.BeforeClass;
+
+public class ThriftMultiWriteStoreTest extends MultiWriteKeyColumnValueStoreTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws BackendException {
+ return new CassandraThriftStoreManager(CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName()));
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftSSLStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftSSLStoreTest.java
new file mode 100644
index 0000000..ffc93b5
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftSSLStoreTest.java
@@ -0,0 +1,22 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
+import com.thinkaurelius.titan.testcategory.CassandraSSLTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ CassandraSSLTests.class })
+public class ThriftSSLStoreTest extends ThriftStoreTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public ModifiableConfiguration getBaseStorageConfiguration() {
+ return CassandraStorageSetup.getCassandraThriftSSLConfiguration(this.getClass().getSimpleName());
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftStoreTest.java
new file mode 100644
index 0000000..8254530
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftStoreTest.java
@@ -0,0 +1,28 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
+import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
+import org.junit.BeforeClass;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreTest;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
+
+public class ThriftStoreTest extends AbstractCassandraStoreTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public ModifiableConfiguration getBaseStorageConfiguration() {
+ return CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName());
+ }
+
+ @Override
+ public AbstractCassandraStoreManager openStorageManager(Configuration c) throws BackendException {
+ return new CassandraThriftStoreManager(c);
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/CassandraGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/CassandraGraphTest.java
new file mode 100644
index 0000000..0aab994
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/CassandraGraphTest.java
@@ -0,0 +1,95 @@
+package com.thinkaurelius.titan.graphdb;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.core.TitanFactory;
+import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager;
+import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph;
+import com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.*;
+
+/**
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public abstract class CassandraGraphTest extends TitanGraphTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ protected boolean isLockingOptimistic() {
+ return true;
+ }
+
+ @Test
+ public void testHasTTL() throws Exception {
+ assertTrue(features.hasCellTTL());
+ }
+
+ @Test
+ public void testGraphConfigUsedByThreadBoundTx() {
+ close();
+ WriteConfiguration wc = getConfiguration();
+ wc.set(ConfigElement.getPath(CASSANDRA_READ_CONSISTENCY), "ALL");
+ wc.set(ConfigElement.getPath(CASSANDRA_WRITE_CONSISTENCY), "LOCAL_QUORUM");
+
+ graph = (StandardTitanGraph) TitanFactory.open(wc);
+
+ StandardTitanTx tx = (StandardTitanTx)graph.getCurrentThreadTx();
+ assertEquals("ALL",
+ tx.getTxHandle().getBaseTransactionConfig().getCustomOptions()
+ .get(AbstractCassandraStoreManager.CASSANDRA_READ_CONSISTENCY));
+ assertEquals("LOCAL_QUORUM",
+ tx.getTxHandle().getBaseTransactionConfig().getCustomOptions()
+ .get(AbstractCassandraStoreManager.CASSANDRA_WRITE_CONSISTENCY));
+ }
+
+ @Test
+ public void testGraphConfigUsedByTx() {
+ close();
+ WriteConfiguration wc = getConfiguration();
+ wc.set(ConfigElement.getPath(CASSANDRA_READ_CONSISTENCY), "TWO");
+ wc.set(ConfigElement.getPath(CASSANDRA_WRITE_CONSISTENCY), "THREE");
+
+ graph = (StandardTitanGraph) TitanFactory.open(wc);
+
+ StandardTitanTx tx = (StandardTitanTx)graph.newTransaction();
+ assertEquals("TWO",
+ tx.getTxHandle().getBaseTransactionConfig().getCustomOptions()
+ .get(AbstractCassandraStoreManager.CASSANDRA_READ_CONSISTENCY));
+ assertEquals("THREE",
+ tx.getTxHandle().getBaseTransactionConfig().getCustomOptions()
+ .get(AbstractCassandraStoreManager.CASSANDRA_WRITE_CONSISTENCY));
+ tx.rollback();
+ }
+
+ @Test
+ public void testCustomConfigUsedByTx() {
+ close();
+ WriteConfiguration wc = getConfiguration();
+ wc.set(ConfigElement.getPath(CASSANDRA_READ_CONSISTENCY), "ALL");
+ wc.set(ConfigElement.getPath(CASSANDRA_WRITE_CONSISTENCY), "ALL");
+
+ graph = (StandardTitanGraph) TitanFactory.open(wc);
+
+ StandardTitanTx tx = (StandardTitanTx)graph.buildTransaction()
+ .customOption(ConfigElement.getPath(CASSANDRA_READ_CONSISTENCY), "ONE")
+ .customOption(ConfigElement.getPath(CASSANDRA_WRITE_CONSISTENCY), "TWO").start();
+
+ assertEquals("ONE",
+ tx.getTxHandle().getBaseTransactionConfig().getCustomOptions()
+ .get(AbstractCassandraStoreManager.CASSANDRA_READ_CONSISTENCY));
+ assertEquals("TWO",
+ tx.getTxHandle().getBaseTransactionConfig().getCustomOptions()
+ .get(AbstractCassandraStoreManager.CASSANDRA_WRITE_CONSISTENCY));
+ tx.rollback();
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphConcurrentTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphConcurrentTest.java
new file mode 100644
index 0000000..3a3c0b8
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphConcurrentTest.java
@@ -0,0 +1,24 @@
+package com.thinkaurelius.titan.graphdb.astyanax;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanGraphConcurrentTest;
+import com.thinkaurelius.titan.testcategory.PerformanceTests;
+
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({PerformanceTests.class})
+public class AstyanaxGraphConcurrentTest extends TitanGraphConcurrentTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getAstyanaxGraphConfiguration(getClass().getSimpleName());
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphPerformanceMemoryTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphPerformanceMemoryTest.java
new file mode 100644
index 0000000..c20e837
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphPerformanceMemoryTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.graphdb.astyanax;
+
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import org.junit.BeforeClass;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.graphdb.TitanGraphPerformanceMemoryTest;
+
+public class AstyanaxGraphPerformanceMemoryTest extends TitanGraphPerformanceMemoryTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getAstyanaxGraphConfiguration(getClass().getSimpleName());
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphTest.java
new file mode 100644
index 0000000..70f595e
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphTest.java
@@ -0,0 +1,13 @@
+package com.thinkaurelius.titan.graphdb.astyanax;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.CassandraGraphTest;
+
+public class AstyanaxGraphTest extends CassandraGraphTest {
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getAstyanaxGraphConfiguration(getClass().getSimpleName());
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxPartitionGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxPartitionGraphTest.java
new file mode 100644
index 0000000..c8fb206
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxPartitionGraphTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.graphdb.astyanax;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanGraphTest;
+import com.thinkaurelius.titan.graphdb.TitanPartitionGraphTest;
+import org.junit.BeforeClass;
+
+public class AstyanaxPartitionGraphTest extends TitanPartitionGraphTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public WriteConfiguration getBaseConfiguration() {
+ return CassandraStorageSetup.getAstyanaxGraphConfiguration(getClass().getSimpleName());
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedEventualGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedEventualGraphTest.java
new file mode 100644
index 0000000..4a4fc0f
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedEventualGraphTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.graphdb.embedded;
+
+import org.junit.BeforeClass;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanEventualGraphTest;
+
+public class EmbeddedEventualGraphTest extends TitanEventualGraphTest {
+
+ @BeforeClass
+ public static void startEmbeddedCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getEmbeddedCassandraPartitionGraphConfiguration(getClass().getSimpleName());
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphConcurrentTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphConcurrentTest.java
new file mode 100644
index 0000000..35cc582
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphConcurrentTest.java
@@ -0,0 +1,28 @@
+package com.thinkaurelius.titan.graphdb.embedded;
+
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanGraphConcurrentTest;
+import com.thinkaurelius.titan.testcategory.PerformanceTests;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+
+@Category({PerformanceTests.class})
+public class EmbeddedGraphConcurrentTest extends TitanGraphConcurrentTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getEmbeddedCassandraPartitionGraphConfiguration(getClass().getSimpleName());
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphMemoryPerformanceTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphMemoryPerformanceTest.java
new file mode 100644
index 0000000..cd9d75d
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphMemoryPerformanceTest.java
@@ -0,0 +1,25 @@
+package com.thinkaurelius.titan.graphdb.embedded;
+
+import org.junit.BeforeClass;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanGraphPerformanceMemoryTest;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+
+public class EmbeddedGraphMemoryPerformanceTest extends TitanGraphPerformanceMemoryTest {
+
+ @BeforeClass
+ public static void startCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getEmbeddedCassandraPartitionGraphConfiguration(getClass().getSimpleName());
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphTest.java
new file mode 100644
index 0000000..a602e71
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphTest.java
@@ -0,0 +1,13 @@
+package com.thinkaurelius.titan.graphdb.embedded;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.CassandraGraphTest;
+
+public class EmbeddedGraphTest extends CassandraGraphTest {
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getEmbeddedCassandraPartitionGraphConfiguration(getClass().getSimpleName());
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedPartitionGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedPartitionGraphTest.java
new file mode 100644
index 0000000..4851b3f
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedPartitionGraphTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.graphdb.embedded;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanEventualGraphTest;
+import com.thinkaurelius.titan.graphdb.TitanPartitionGraphTest;
+import org.junit.BeforeClass;
+
+public class EmbeddedPartitionGraphTest extends TitanPartitionGraphTest {
+
+ @BeforeClass
+ public static void startEmbeddedCassandra() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public WriteConfiguration getBaseConfiguration() {
+ return CassandraStorageSetup.getEmbeddedCassandraPartitionGraphConfiguration(getClass().getSimpleName());
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftEventualGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftEventualGraphTest.java
new file mode 100644
index 0000000..8fe1362
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftEventualGraphTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.graphdb.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanEventualGraphTest;
+import com.thinkaurelius.titan.graphdb.TitanGraphTest;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+public class ThriftEventualGraphTest extends TitanEventualGraphTest {
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName());
+ }
+
+ @BeforeClass
+ public static void beforeClass() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphCacheTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphCacheTest.java
new file mode 100644
index 0000000..17848d4
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphCacheTest.java
@@ -0,0 +1,28 @@
+package com.thinkaurelius.titan.graphdb.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.StorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanGraphTest;
+import org.junit.BeforeClass;
+
+public class ThriftGraphCacheTest extends TitanGraphTest {
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return StorageSetup.addPermanentCache(CassandraStorageSetup.getCassandraThriftConfiguration(getClass().getSimpleName()));
+ }
+
+
+ @BeforeClass
+ public static void beforeClass() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+
+
+ @Override
+ protected boolean isLockingOptimistic() {
+ return true;
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphConcurrentTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphConcurrentTest.java
new file mode 100644
index 0000000..f1d4fe4
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphConcurrentTest.java
@@ -0,0 +1,24 @@
+package com.thinkaurelius.titan.graphdb.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanGraphConcurrentTest;
+import com.thinkaurelius.titan.testcategory.PerformanceTests;
+
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({PerformanceTests.class})
+public class ThriftGraphConcurrentTest extends TitanGraphConcurrentTest {
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName());
+ }
+
+
+ @BeforeClass
+ public static void beforeClass() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphIterativeTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphIterativeTest.java
new file mode 100644
index 0000000..dd45c42
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphIterativeTest.java
@@ -0,0 +1,30 @@
+package com.thinkaurelius.titan.graphdb.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.CassandraThriftStoreManager;
+import com.thinkaurelius.titan.diskstorage.configuration.BasicConfiguration;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import com.thinkaurelius.titan.graphdb.TitanGraphIterativeBenchmark;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import org.junit.BeforeClass;
+
+public class ThriftGraphIterativeTest extends TitanGraphIterativeBenchmark {
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName());
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws BackendException {
+ return new CassandraThriftStoreManager(new BasicConfiguration(GraphDatabaseConfiguration.ROOT_NS,getConfiguration(), BasicConfiguration.Restriction.NONE));
+ }
+
+
+ @BeforeClass
+ public static void beforeClass() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphPerformanceMemoryTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphPerformanceMemoryTest.java
new file mode 100644
index 0000000..893b6c8
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphPerformanceMemoryTest.java
@@ -0,0 +1,22 @@
+package com.thinkaurelius.titan.graphdb.thrift;
+
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.junit.BeforeClass;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.graphdb.TitanGraphPerformanceMemoryTest;
+
+public class ThriftGraphPerformanceMemoryTest extends TitanGraphPerformanceMemoryTest {
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName());
+ }
+
+
+ @BeforeClass
+ public static void beforeClass() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphSpeedTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphSpeedTest.java
new file mode 100644
index 0000000..6f01f0b
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphSpeedTest.java
@@ -0,0 +1,56 @@
+package com.thinkaurelius.titan.graphdb.thrift;
+
+import com.thinkaurelius.titan.diskstorage.BackendException;
+import com.thinkaurelius.titan.graphdb.SpeedTestSchema;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.core.TitanFactory;
+import com.thinkaurelius.titan.graphdb.TitanGraphSpeedTest;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph;
+import com.thinkaurelius.titan.testcategory.PerformanceTests;
+
+@Category({PerformanceTests.class})
+public class ThriftGraphSpeedTest extends TitanGraphSpeedTest {
+
+ private static StandardTitanGraph graph;
+ private static SpeedTestSchema schema;
+
+ private static final Logger log =
+ LoggerFactory.getLogger(ThriftGraphSpeedTest.class);
+
+ public ThriftGraphSpeedTest() throws BackendException {
+ super(CassandraStorageSetup.getCassandraThriftGraphConfiguration(ThriftGraphSpeedTest.class.getSimpleName()));
+ }
+
+ @BeforeClass
+ public static void beforeClass() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ protected StandardTitanGraph getGraph() throws BackendException {
+
+
+ if (null == graph) {
+ GraphDatabaseConfiguration graphconfig = new GraphDatabaseConfiguration(conf);
+ graphconfig.getBackend().clearStorage();
+ log.debug("Cleared backend storage");
+ graph = (StandardTitanGraph)TitanFactory.open(conf);
+ initializeGraph(graph);
+ }
+ return graph;
+ }
+
+ @Override
+ protected SpeedTestSchema getSchema() {
+ if (null == schema) {
+ schema = SpeedTestSchema.get();
+ }
+ return schema;
+ }
+} \ No newline at end of file
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphTest.java
new file mode 100644
index 0000000..e9a9ac9
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphTest.java
@@ -0,0 +1,13 @@
+package com.thinkaurelius.titan.graphdb.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.CassandraGraphTest;
+
+public class ThriftGraphTest extends CassandraGraphTest {
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName());
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOLAPTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOLAPTest.java
new file mode 100644
index 0000000..8679da9
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOLAPTest.java
@@ -0,0 +1,20 @@
+package com.thinkaurelius.titan.graphdb.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.olap.OLAPTest;
+import org.junit.BeforeClass;
+
+public class ThriftOLAPTest extends OLAPTest {
+
+ @Override
+ public WriteConfiguration getConfiguration() {
+ return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName());
+ }
+
+
+ @BeforeClass
+ public static void beforeClass() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOperationCountingTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOperationCountingTest.java
new file mode 100644
index 0000000..9bae61d
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOperationCountingTest.java
@@ -0,0 +1,25 @@
+package com.thinkaurelius.titan.graphdb.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanOperationCountingTest;
+import org.junit.BeforeClass;
+
+public class ThriftOperationCountingTest extends TitanOperationCountingTest {
+
+ @BeforeClass
+ public static void beforeClass() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public WriteConfiguration getBaseConfiguration() {
+ return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName());
+ }
+
+ @Override
+ public boolean storeUsesConsistentKeyLocker() {
+ return true;
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftPartitionGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftPartitionGraphTest.java
new file mode 100644
index 0000000..728e338
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftPartitionGraphTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.graphdb.thrift;
+
+import com.thinkaurelius.titan.CassandraStorageSetup;
+import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration;
+import com.thinkaurelius.titan.graphdb.TitanOperationCountingTest;
+import com.thinkaurelius.titan.graphdb.TitanPartitionGraphTest;
+import org.junit.BeforeClass;
+
+public class ThriftPartitionGraphTest extends TitanPartitionGraphTest {
+
+ @BeforeClass
+ public static void beforeClass() {
+ CassandraStorageSetup.startCleanEmbedded();
+ }
+
+ @Override
+ public WriteConfiguration getBaseConfiguration() {
+ return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName());
+ }
+
+}
diff --git a/src/test/java/com/thinkaurelius/titan/testcategory/CassandraSSLTests.java b/src/test/java/com/thinkaurelius/titan/testcategory/CassandraSSLTests.java
new file mode 100644
index 0000000..d1e3dd8
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/testcategory/CassandraSSLTests.java
@@ -0,0 +1,10 @@
+package com.thinkaurelius.titan.testcategory;
+
+/**
+ * This is a JUnit category for tests that need to run against Cassandra
+ * configured for SSL-based client authentication.
+ *
+ * If you rename or move this class, then you must also update mentions of it in
+ * the Cassandra module's pom.xml.
+ */
+public interface CassandraSSLTests { }
diff --git a/src/test/java/com/thinkaurelius/titan/testcategory/StandaloneTests.java b/src/test/java/com/thinkaurelius/titan/testcategory/StandaloneTests.java
new file mode 100644
index 0000000..722c314
--- /dev/null
+++ b/src/test/java/com/thinkaurelius/titan/testcategory/StandaloneTests.java
@@ -0,0 +1,9 @@
+package com.thinkaurelius.titan.testcategory;
+
+/**
+ * This is a JUnit category for tests that don't need a Cassandra server.
+ *
+ * If you rename or move this class, then you must also update mentions of it in
+ * the Cassandra module's pom.xml.
+ */
+public class StandaloneTests { }
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
new file mode 100644
index 0000000..271e198
--- /dev/null
+++ b/src/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+# A1 is a FileAppender.
+log4j.appender.A1=org.apache.log4j.FileAppender
+log4j.appender.A1.File=target/test.log
+log4j.appender.A1.Threshold=TRACE
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{2}: %m%n
+
+# A2 is a ConsoleAppender.
+log4j.appender.A2=org.apache.log4j.ConsoleAppender
+log4j.appender.A2.Threshold=WARN
+# A2 uses PatternLayout.
+log4j.appender.A2.layout=org.apache.log4j.PatternLayout
+log4j.appender.A2.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{2}: %m%n
+
+# Set both appenders (A1 and A2) on the root logger.
+log4j.rootLogger=INFO, A1, A2
+#log4j.rootLogger=INFO, A1
+
+# Restrict some of Titan's dependencies to INFO and scarier.
+# These restrictions are useful when reducing the severity threshold
+# setting on one of the appenders below INFO.
+log4j.logger.org.apache.cassandra=INFO
+log4j.logger.org.apache.hadoop=INFO
+log4j.logger.org.apache.zookeeper=INFO
+# Disable all messages from ExpectedValueCheckingTransaction. The point is to
+# suppress scary-looking ERROR messages that are deliberately induced by
+# LockKeyColumnValueStoreTest.
+log4j.logger.com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction=OFF
+# Lower dblog-related message thresholds
+log4j.logger.com.thinkaurelius.titan.diskstorage.log.kcvs.KCVSLog=DEBUG
+log4j.logger.com.thinkaurelius.titan.diskstorage.log.LogTest=DEBUG
+log4j.logger.com.thinkaurelius.titan.diskstorage.log.KCVSLogTest=DEBUG
+log4j.logger.com.thinkaurelius.titan.diskstorage.log.util.ProcessMessageJob=DEBUG
+log4j.logger.com.thinkaurelius.titan.diskstorage.util.time.Timestamps=DEBUG
diff --git a/src/test/resources/rexster-fragment.xml b/src/test/resources/rexster-fragment.xml
new file mode 100644
index 0000000..19f88f9
--- /dev/null
+++ b/src/test/resources/rexster-fragment.xml
@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<graph>
+ <graph-read-only>false</graph-read-only>
+ <graph-location>home</graph-location>
+ <properties>
+ <storage.backend>local</storage.backend>
+ </properties>
+ <extensions>
+ <allows>
+ <allow>tp:gremlin</allow>
+ </allows>
+ </extensions>
+</graph>