diff options
author | Michael Lando <ml636r@att.com> | 2017-05-14 11:32:52 +0300 |
---|---|---|
committer | Michael Lando <ml636r@att.com> | 2017-05-14 11:46:35 +0300 |
commit | 6921632f70d9321b7e37c2a00618a21bccc0e765 (patch) | |
tree | 6789a7688e2fc9275c7ca63071ac2068c49b1746 /src |
first commit for new repo
Change-Id: I307341ad768d38a60fb49f96b5b40186c0bd1205
Signed-off-by: Michael Lando <ml636r@att.com>
Diffstat (limited to 'src')
68 files changed, 5769 insertions, 0 deletions
diff --git a/src/assembly/distribution.xml b/src/assembly/distribution.xml new file mode 100644 index 0000000..72ad2f0 --- /dev/null +++ b/src/assembly/distribution.xml @@ -0,0 +1,80 @@ +<assembly> + <id>distribution</id> + <formats> + <format>zip</format> + </formats> + <fileSets> + <fileSet> + <fileMode>0775</fileMode> + <directory>../titan-all/src/main/bin</directory> + <outputDirectory>bin</outputDirectory> + <includes> + <include>*.sh</include> + <include>*.bat</include> + </includes> + </fileSet> + <fileSet> + <directory>src</directory> + </fileSet> + <fileSet> + <directory>../doc</directory> + </fileSet> + <fileSet> + <directory>../config</directory> + </fileSet> + <fileSet> + <directory>../target/site/apidocs</directory> + <outputDirectory>doc/javadoc</outputDirectory> + </fileSet> + <fileSet> + <directory>target/*.jar</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>../bin</directory> + <outputDirectory>bin</outputDirectory> + <includes> + <include>cassandra*</include> + </includes> + </fileSet> + </fileSets> + <files> + <file> + <source>pom.xml</source> + <outputDirectory>src</outputDirectory> + </file> + <file> + <source>../LICENSE.txt</source> + <outputDirectory>/</outputDirectory> + </file> + <file> + <source>../CHANGELOG.textile</source> + <outputDirectory>/</outputDirectory> + </file> + <file> + <source>../UPGRADE.textile</source> + <outputDirectory>/</outputDirectory> + </file> + <file> + <source>../NOTICE.txt</source> + <outputDirectory>/</outputDirectory> + </file> + <file> + <source>../titan-all/src/main/bin/README.txt</source> + <outputDirectory>/</outputDirectory> + </file> + </files> + + <dependencySets> + <dependencySet> + <outputDirectory>/lib</outputDirectory> + <unpack>false</unpack> + <scope>compile</scope> + </dependencySet> + <dependencySet> + <outputDirectory>/lib</outputDirectory> + <unpack>false</unpack> + <scope>provided</scope> + </dependencySet> + </dependencySets> +</assembly> diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java new file mode 100644 index 0000000..1e12ada --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java @@ -0,0 +1,297 @@ +package com.thinkaurelius.titan.diskstorage.cassandra; + +import java.util.*; + +import com.google.common.collect.ImmutableMap; +import com.thinkaurelius.titan.core.TitanException; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; + +import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.*; + +import org.apache.cassandra.dht.IPartitioner; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@PreInitializeConfigOptions +public abstract class AbstractCassandraStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager { + + public enum Partitioner { + + RANDOM, BYTEORDER; + + public static Partitioner getPartitioner(IPartitioner partitioner) { + return getPartitioner(partitioner.getClass().getSimpleName()); + } + + public static Partitioner getPartitioner(String className) { + if (className.endsWith("RandomPartitioner") || className.endsWith("Murmur3Partitioner")) + return Partitioner.RANDOM; + else if (className.endsWith("ByteOrderedPartitioner")) return Partitioner.BYTEORDER; + else throw new IllegalArgumentException("Unsupported partitioner: " + className); + } + } + + //################### CASSANDRA SPECIFIC CONFIGURATION OPTIONS ###################### + + public static final ConfigNamespace CASSANDRA_NS = + new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "cassandra", "Cassandra storage backend options"); + + public static final ConfigOption<String> CASSANDRA_KEYSPACE = + new ConfigOption<String>(CASSANDRA_NS, "keyspace", + "The name of Titan's keyspace. It will be created if it does not exist.", + ConfigOption.Type.LOCAL, "titan"); + + // Consistency Levels and Atomic Batch + public static final ConfigOption<String> CASSANDRA_READ_CONSISTENCY = + new ConfigOption<String>(CASSANDRA_NS, "read-consistency-level", + "The consistency level of read operations against Cassandra", + ConfigOption.Type.MASKABLE, "LOCAL_QUORUM"); + + public static final ConfigOption<String> CASSANDRA_WRITE_CONSISTENCY = + new ConfigOption<String>(CASSANDRA_NS, "write-consistency-level", + "The consistency level of write operations against Cassandra", + ConfigOption.Type.MASKABLE, "LOCAL_QUORUM"); + + public static final ConfigOption<Boolean> ATOMIC_BATCH_MUTATE = + new ConfigOption<Boolean>(CASSANDRA_NS, "atomic-batch-mutate", + "True to use Cassandra atomic batch mutation, false to use non-atomic batches", + ConfigOption.Type.MASKABLE, true); + + // Replication + public static final ConfigOption<Integer> REPLICATION_FACTOR = + new ConfigOption<Integer>(CASSANDRA_NS, "replication-factor", + "The number of data replicas (including the original copy) that should be kept. " + + "This is only meaningful for storage backends that natively support data replication.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + public static final ConfigOption<String> REPLICATION_STRATEGY = + new ConfigOption<String>(CASSANDRA_NS, "replication-strategy-class", + "The replication strategy to use for Titan keyspace", + ConfigOption.Type.FIXED, "org.apache.cassandra.locator.SimpleStrategy"); + + public static final ConfigOption<String[]> REPLICATION_OPTIONS = + new ConfigOption<String[]>(CASSANDRA_NS, "replication-strategy-options", + "Replication strategy options, e.g. factor or replicas per datacenter. This list is interpreted as a " + + "map. It must have an even number of elements in [key,val,key,val,...] form. A replication_factor set " + + "here takes precedence over one set with " + ConfigElement.getPath(REPLICATION_FACTOR), + ConfigOption.Type.FIXED, String[].class); + + // Compression + public static final ConfigOption<Boolean> CF_COMPRESSION = + new ConfigOption<Boolean>(CASSANDRA_NS, "compression", + "Whether the storage backend should use compression when storing the data", ConfigOption.Type.FIXED, true); + + public static final ConfigOption<String> CF_COMPRESSION_TYPE = + new ConfigOption<String>(CASSANDRA_NS, "compression-type", + "The sstable_compression value Titan uses when creating column families. " + + "This accepts any value allowed by Cassandra's sstable_compression option. " + + "Leave this unset to disable sstable_compression on Titan-created CFs.", + ConfigOption.Type.MASKABLE, "LZ4Compressor"); + + public static final ConfigOption<Integer> CF_COMPRESSION_BLOCK_SIZE = + new ConfigOption<Integer>(CASSANDRA_NS, "compression-block-size", + "The size of the compression blocks in kilobytes", ConfigOption.Type.FIXED, 64); + + // SSL + public static final ConfigNamespace SSL_NS = + new ConfigNamespace(CASSANDRA_NS, "ssl", "Configuration options for SSL"); + + public static final ConfigNamespace SSL_TRUSTSTORE_NS = + new ConfigNamespace(SSL_NS, "truststore", "Configuration options for SSL Truststore."); + + public static final ConfigOption<Boolean> SSL_ENABLED = + new ConfigOption<Boolean>(SSL_NS, "enabled", + "Controls use of the SSL connection to Cassandra", ConfigOption.Type.LOCAL, false); + + public static final ConfigOption<String> SSL_TRUSTSTORE_LOCATION = + new ConfigOption<String>(SSL_TRUSTSTORE_NS, "location", + "Marks the location of the SSL Truststore.", ConfigOption.Type.LOCAL, ""); + + public static final ConfigOption<String> SSL_TRUSTSTORE_PASSWORD = + new ConfigOption<String>(SSL_TRUSTSTORE_NS, "password", + "The password to access SSL Truststore.", ConfigOption.Type.LOCAL, ""); + + // Thrift transport + public static final ConfigOption<Integer> THRIFT_FRAME_SIZE_MB = + new ConfigOption<>(CASSANDRA_NS, "frame-size-mb", + "The thrift frame size in megabytes", ConfigOption.Type.MASKABLE, 15); + + /** + * The default Thrift port used by Cassandra. Set + * {@link GraphDatabaseConfiguration#STORAGE_PORT} to override. + * <p> + * Value = {@value} + */ + public static final int PORT_DEFAULT = 9160; + + public static final String SYSTEM_KS = "system"; + + protected final String keySpaceName; + protected final Map<String, String> strategyOptions; + + protected final boolean compressionEnabled; + protected final int compressionChunkSizeKB; + protected final String compressionClass; + + protected final boolean atomicBatch; + + protected final int thriftFrameSizeBytes; + + private volatile StoreFeatures features = null; + private Partitioner partitioner = null; + + private static final Logger log = + LoggerFactory.getLogger(AbstractCassandraStoreManager.class); + + public AbstractCassandraStoreManager(Configuration config) { + super(config, PORT_DEFAULT); + + this.keySpaceName = config.get(CASSANDRA_KEYSPACE); + this.compressionEnabled = config.get(CF_COMPRESSION); + this.compressionChunkSizeKB = config.get(CF_COMPRESSION_BLOCK_SIZE); + this.compressionClass = config.get(CF_COMPRESSION_TYPE); + this.atomicBatch = config.get(ATOMIC_BATCH_MUTATE); + this.thriftFrameSizeBytes = config.get(THRIFT_FRAME_SIZE_MB) * 1024 * 1024; + + // SSL truststore location sanity check + if (config.get(SSL_ENABLED) && config.get(SSL_TRUSTSTORE_LOCATION).isEmpty()) + throw new IllegalArgumentException(SSL_TRUSTSTORE_LOCATION.getName() + " could not be empty when SSL is enabled."); + + if (config.has(REPLICATION_OPTIONS)) { + String[] options = config.get(REPLICATION_OPTIONS); + + if (options.length % 2 != 0) + throw new IllegalArgumentException(REPLICATION_OPTIONS.getName() + " should have even number of elements."); + + Map<String, String> converted = new HashMap<String, String>(options.length / 2); + + for (int i = 0; i < options.length; i += 2) { + converted.put(options[i], options[i + 1]); + } + + this.strategyOptions = ImmutableMap.copyOf(converted); + } else { + this.strategyOptions = ImmutableMap.of("replication_factor", String.valueOf(config.get(REPLICATION_FACTOR))); + } + } + + public final Partitioner getPartitioner() { + if (partitioner == null) { + try { + partitioner = Partitioner.getPartitioner(getCassandraPartitioner()); + } catch (BackendException e) { + throw new TitanException("Could not connect to Cassandra to read partitioner information. Please check the connection", e); + } + } + assert partitioner != null; + return partitioner; + } + + public abstract IPartitioner getCassandraPartitioner() throws BackendException; + + @Override + public StoreTransaction beginTransaction(final BaseTransactionConfig config) { + return new CassandraTransaction(config); + } + + @Override + public String toString() { + return "[" + keySpaceName + "@" + super.toString() + "]"; + } + + @Override + public StoreFeatures getFeatures() { + + if (features == null) { + + Configuration global = GraphDatabaseConfiguration.buildGraphConfiguration() + .set(CASSANDRA_READ_CONSISTENCY, "LOCAL_QUORUM") + .set(CASSANDRA_WRITE_CONSISTENCY, "LOCAL_QUORUM") + .set(METRICS_PREFIX, GraphDatabaseConfiguration.METRICS_SYSTEM_PREFIX_DEFAULT); + + Configuration local = GraphDatabaseConfiguration.buildGraphConfiguration() + .set(CASSANDRA_READ_CONSISTENCY, "LOCAL_QUORUM") + .set(CASSANDRA_WRITE_CONSISTENCY, "LOCAL_QUORUM") + .set(METRICS_PREFIX, GraphDatabaseConfiguration.METRICS_SYSTEM_PREFIX_DEFAULT); + + StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder(); + + fb.batchMutation(true).distributed(true); + fb.timestamps(true).cellTTL(true); + fb.keyConsistent(global, local); + + boolean keyOrdered; + + switch (getPartitioner()) { + case RANDOM: + keyOrdered = false; + fb.keyOrdered(keyOrdered).orderedScan(false).unorderedScan(true); + break; + + case BYTEORDER: + keyOrdered = true; + fb.keyOrdered(keyOrdered).orderedScan(true).unorderedScan(false); + break; + + default: + throw new IllegalArgumentException("Unrecognized partitioner: " + getPartitioner()); + } + + switch (getDeployment()) { + case REMOTE: + fb.multiQuery(true); + break; + + case LOCAL: + fb.multiQuery(true).localKeyPartition(keyOrdered); + break; + + case EMBEDDED: + fb.multiQuery(false).localKeyPartition(keyOrdered); + break; + + default: + throw new IllegalArgumentException("Unrecognized deployment mode: " + getDeployment()); + } + + features = fb.build(); + } + + return features; + } + + /** + * Returns a map of compression options for the column family {@code cf}. + * The contents of the returned map must be identical to the contents of the + * map returned by + * {@link org.apache.cassandra.thrift.CfDef#getCompression_options()}, even + * for implementations of this method that don't use Thrift. + * + * @param cf the name of the column family for which to return compression + * options + * @return map of compression option names to compression option values + * @throws com.thinkaurelius.titan.diskstorage.BackendException if reading from Cassandra fails + */ + public abstract Map<String, String> getCompressionOptions(String cf) throws BackendException; + + public String getName() { + return getClass().getSimpleName() + keySpaceName; + } + +} diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevel.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevel.java new file mode 100644 index 0000000..f3b1ca8 --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevel.java @@ -0,0 +1,59 @@ +package com.thinkaurelius.titan.diskstorage.cassandra; + +import com.google.common.base.Preconditions; + +/** + * This enum unites different libraries' consistency level enums, streamlining + * configuration and processing in {@link AbstractCassandraStoreManager}. + * + */ +public enum CLevel implements CLevelInterface { // One ring to rule them all + ANY, + ONE, + TWO, + THREE, + QUORUM, + ALL, + LOCAL_QUORUM, + EACH_QUORUM; + + private final org.apache.cassandra.db.ConsistencyLevel db; + private final org.apache.cassandra.thrift.ConsistencyLevel thrift; + private final com.netflix.astyanax.model.ConsistencyLevel astyanax; + + private CLevel() { + db = org.apache.cassandra.db.ConsistencyLevel.valueOf(toString()); + thrift = org.apache.cassandra.thrift.ConsistencyLevel.valueOf(toString()); + astyanax = com.netflix.astyanax.model.ConsistencyLevel.valueOf("CL_" + toString()); + } + + @Override + public org.apache.cassandra.db.ConsistencyLevel getDB() { + return db; + } + + @Override + public org.apache.cassandra.thrift.ConsistencyLevel getThrift() { + return thrift; + } + + @Override + public com.netflix.astyanax.model.ConsistencyLevel getAstyanax() { + return astyanax; + } + + public static CLevel parse(String value) { + Preconditions.checkArgument(value != null && !value.isEmpty()); + value = value.trim(); + if (value.equals("1")) return ONE; + else if (value.equals("2")) return TWO; + else if (value.equals("3")) return THREE; + else { + for (CLevel c : values()) { + if (c.toString().equalsIgnoreCase(value) || + ("CL_" + c.toString()).equalsIgnoreCase(value)) return c; + } + } + throw new IllegalArgumentException("Unrecognized cassandra consistency level: " + value); + } +} diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevelInterface.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevelInterface.java new file mode 100644 index 0000000..76d7ede --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CLevelInterface.java @@ -0,0 +1,10 @@ +package com.thinkaurelius.titan.diskstorage.cassandra; + +public interface CLevelInterface { + + public org.apache.cassandra.db.ConsistencyLevel getDB(); + + public org.apache.cassandra.thrift.ConsistencyLevel getThrift(); + + public com.netflix.astyanax.model.ConsistencyLevel getAstyanax(); +} diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransaction.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransaction.java new file mode 100644 index 0000000..e94c08f --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransaction.java @@ -0,0 +1,53 @@ +package com.thinkaurelius.titan.diskstorage.cassandra; + +import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.CASSANDRA_READ_CONSISTENCY; +import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.CASSANDRA_WRITE_CONSISTENCY; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; + +public class CassandraTransaction extends AbstractStoreTransaction { + + private static final Logger log = LoggerFactory.getLogger(CassandraTransaction.class); + + private final CLevel read; + private final CLevel write; + + public CassandraTransaction(BaseTransactionConfig c) { + super(c); + read = CLevel.parse(getConfiguration().getCustomOption(CASSANDRA_READ_CONSISTENCY)); + write = CLevel.parse(getConfiguration().getCustomOption(CASSANDRA_WRITE_CONSISTENCY)); + log.debug("Created {}", this.toString()); + } + + public CLevel getReadConsistencyLevel() { + return read; + } + + public CLevel getWriteConsistencyLevel() { + return write; + } + + public static CassandraTransaction getTx(StoreTransaction txh) { + Preconditions.checkArgument(txh != null); + Preconditions.checkArgument(txh instanceof CassandraTransaction, "Unexpected transaction type %s", txh.getClass().getName()); + return (CassandraTransaction) txh; + } + + public String toString() { + StringBuilder sb = new StringBuilder(64); + sb.append("CassandraTransaction@"); + sb.append(Integer.toHexString(hashCode())); + sb.append("[read="); + sb.append(read); + sb.append(",write="); + sb.append(write); + sb.append("]"); + return sb.toString(); + } +} diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java new file mode 100644 index 0000000..0d3eb9b --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java @@ -0,0 +1,358 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.astyanax; + +import com.google.common.base.Predicate; +import com.google.common.collect.*; +import com.netflix.astyanax.ExceptionCallback; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.OperationResult; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import com.netflix.astyanax.model.*; +import com.netflix.astyanax.query.AllRowsQuery; +import com.netflix.astyanax.query.RowSliceQuery; +import com.netflix.astyanax.retry.RetryPolicy; +import com.netflix.astyanax.serializers.ByteBufferSerializer; +import com.thinkaurelius.titan.diskstorage.*; +import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; +import com.thinkaurelius.titan.diskstorage.util.RecordIterator; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.*; + +import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.Partitioner; +import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx; + +public class AstyanaxKeyColumnValueStore implements KeyColumnValueStore { + + private final Keyspace keyspace; + private final String columnFamilyName; + private final ColumnFamily<ByteBuffer, ByteBuffer> columnFamily; + private final RetryPolicy retryPolicy; + private final AstyanaxStoreManager storeManager; + private final AstyanaxGetter entryGetter; + + AstyanaxKeyColumnValueStore(String columnFamilyName, + Keyspace keyspace, + AstyanaxStoreManager storeManager, + RetryPolicy retryPolicy) { + this.keyspace = keyspace; + this.columnFamilyName = columnFamilyName; + this.retryPolicy = retryPolicy; + this.storeManager = storeManager; + + entryGetter = new AstyanaxGetter(storeManager.getMetaDataSchema(columnFamilyName)); + + columnFamily = new ColumnFamily<ByteBuffer, ByteBuffer>( + this.columnFamilyName, + ByteBufferSerializer.get(), + ByteBufferSerializer.get()); + + } + + + ColumnFamily<ByteBuffer, ByteBuffer> getColumnFamily() { + return columnFamily; + } + + @Override + public void close() throws BackendException { + //Do nothing + } + + @Override + public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException { + Map<StaticBuffer, EntryList> result = getNamesSlice(query.getKey(), query, txh); + return Iterables.getOnlyElement(result.values(),EntryList.EMPTY_LIST); + } + + @Override + public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException { + return getNamesSlice(keys, query, txh); + } + + public Map<StaticBuffer, EntryList> getNamesSlice(StaticBuffer key, + SliceQuery query, StoreTransaction txh) throws BackendException { + return getNamesSlice(ImmutableList.of(key),query,txh); + } + + + public Map<StaticBuffer, EntryList> getNamesSlice(List<StaticBuffer> keys, + SliceQuery query, StoreTransaction txh) throws BackendException { + /* + * RowQuery<K,C> should be parameterized as + * RowQuery<ByteBuffer,ByteBuffer>. However, this causes the following + * compilation error when attempting to call withColumnRange on a + * RowQuery<ByteBuffer,ByteBuffer> instance: + * + * java.lang.Error: Unresolved compilation problem: The method + * withColumnRange(ByteBuffer, ByteBuffer, boolean, int) is ambiguous + * for the type RowQuery<ByteBuffer,ByteBuffer> + * + * The compiler substitutes ByteBuffer=C for both startColumn and + * endColumn, compares it to its identical twin with that type + * hard-coded, and dies. + * + */ + RowSliceQuery rq = keyspace.prepareQuery(columnFamily) + .setConsistencyLevel(getTx(txh).getReadConsistencyLevel().getAstyanax()) + .withRetryPolicy(retryPolicy.duplicate()) + .getKeySlice(CassandraHelper.convert(keys)); + + // Thank you, Astyanax, for making builder pattern useful :( + rq.withColumnRange(query.getSliceStart().asByteBuffer(), + query.getSliceEnd().asByteBuffer(), + false, + query.getLimit() + (query.hasLimit()?1:0)); //Add one for potentially removed last column + + OperationResult<Rows<ByteBuffer, ByteBuffer>> r; + try { + r = (OperationResult<Rows<ByteBuffer, ByteBuffer>>) rq.execute(); + } catch (ConnectionException e) { + throw new TemporaryBackendException(e); + } + + Rows<ByteBuffer,ByteBuffer> rows = r.getResult(); + Map<StaticBuffer, EntryList> result = new HashMap<StaticBuffer, EntryList>(rows.size()); + + for (Row<ByteBuffer, ByteBuffer> row : rows) { + assert !result.containsKey(row.getKey()); + result.put(StaticArrayBuffer.of(row.getKey()), + CassandraHelper.makeEntryList(row.getColumns(),entryGetter, query.getSliceEnd(), query.getLimit())); + } + + return result; + } + + private static class AstyanaxGetter implements StaticArrayEntry.GetColVal<Column<ByteBuffer>,ByteBuffer> { + + private final EntryMetaData[] schema; + + private AstyanaxGetter(EntryMetaData[] schema) { + this.schema = schema; + } + + + @Override + public ByteBuffer getColumn(Column<ByteBuffer> element) { + return element.getName(); + } + + @Override + public ByteBuffer getValue(Column<ByteBuffer> element) { + return element.getByteBufferValue(); + } + + @Override + public EntryMetaData[] getMetaSchema(Column<ByteBuffer> element) { + return schema; + } + + @Override + public Object getMetaData(Column<ByteBuffer> element, EntryMetaData meta) { + switch(meta) { + case TIMESTAMP: + return element.getTimestamp(); + case TTL: + return element.getTtl(); + default: + throw new UnsupportedOperationException("Unsupported meta data: " + meta); + } + } + } + + @Override + public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException { + mutateMany(ImmutableMap.of(key, new KCVMutation(additions, deletions)), txh); + } + + public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException { + storeManager.mutateMany(ImmutableMap.of(columnFamilyName, mutations), txh); + } + + @Override + public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException { + throw new UnsupportedOperationException(); + } + + @Override + public KeyIterator getKeys(@Nullable SliceQuery sliceQuery, StoreTransaction txh) throws BackendException { + if (storeManager.getPartitioner() != Partitioner.RANDOM) + throw new PermanentBackendException("This operation is only allowed when random partitioner (md5 or murmur3) is used."); + + AllRowsQuery allRowsQuery = keyspace.prepareQuery(columnFamily).getAllRows(); + + if (sliceQuery != null) { + allRowsQuery.withColumnRange(sliceQuery.getSliceStart().asByteBuffer(), + sliceQuery.getSliceEnd().asByteBuffer(), + false, + sliceQuery.getLimit()); + } + + Rows<ByteBuffer, ByteBuffer> result; + try { + /* Note: we need to fetch columns for each row as well to remove "range ghosts" */ + OperationResult op = allRowsQuery.setRowLimit(storeManager.getPageSize()) // pre-fetch that many rows at a time + .setConcurrencyLevel(1) // one execution thread for fetching portion of rows + .setExceptionCallback(new ExceptionCallback() { + private int retries = 0; + + @Override + public boolean onException(ConnectionException e) { + try { + return retries > 2; // make 3 re-tries + } finally { + retries++; + } + } + }).execute(); + + result = ((OperationResult<Rows<ByteBuffer, ByteBuffer>>) op).getResult(); + } catch (ConnectionException e) { + throw new PermanentBackendException(e); + } + + return new RowIterator(result.iterator(), sliceQuery); + } + + @Override + public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException { + // this query could only be done when byte-ordering partitioner is used + // because Cassandra operates on tokens internally which means that even contiguous + // range of keys (e.g. time slice) with random partitioner could produce disjoint set of tokens + // returning ambiguous results to the user. + Partitioner partitioner = storeManager.getPartitioner(); + if (partitioner != Partitioner.BYTEORDER) + throw new PermanentBackendException("getKeys(KeyRangeQuery could only be used with byte-ordering partitioner."); + + ByteBuffer start = query.getKeyStart().asByteBuffer(), end = query.getKeyEnd().asByteBuffer(); + + RowSliceQuery rowSlice = keyspace.prepareQuery(columnFamily) + .setConsistencyLevel(getTx(txh).getReadConsistencyLevel().getAstyanax()) + .withRetryPolicy(retryPolicy.duplicate()) + .getKeyRange(start, end, null, null, Integer.MAX_VALUE); + + // Astyanax is bad at builder pattern :( + rowSlice.withColumnRange(query.getSliceStart().asByteBuffer(), + query.getSliceEnd().asByteBuffer(), + false, + query.getLimit()); + + // Omit final the query's keyend from the result, if present in result + final Rows<ByteBuffer, ByteBuffer> r; + try { + r = ((OperationResult<Rows<ByteBuffer, ByteBuffer>>) rowSlice.execute()).getResult(); + } catch (ConnectionException e) { + throw new TemporaryBackendException(e); + } + Iterator<Row<ByteBuffer, ByteBuffer>> i = + Iterators.filter(r.iterator(), new KeySkipPredicate(query.getKeyEnd().asByteBuffer())); + return new RowIterator(i, query); + } + + @Override + public String getName() { + return columnFamilyName; + } + + private static class KeyIterationPredicate implements Predicate<Row<ByteBuffer, ByteBuffer>> { + @Override + public boolean apply(@Nullable Row<ByteBuffer, ByteBuffer> row) { + return (row != null) && row.getColumns().size() > 0; + } + } + + private static class KeySkipPredicate implements Predicate<Row<ByteBuffer, ByteBuffer>> { + + private final ByteBuffer skip; + + public KeySkipPredicate(ByteBuffer skip) { + this.skip = skip; + } + + @Override + public boolean apply(@Nullable Row<ByteBuffer, ByteBuffer> row) { + return (row != null) && !row.getKey().equals(skip); + } + } + + private class RowIterator implements KeyIterator { + private final Iterator<Row<ByteBuffer, ByteBuffer>> rows; + private Row<ByteBuffer, ByteBuffer> currentRow; + private final SliceQuery sliceQuery; + private boolean isClosed; + + public RowIterator(Iterator<Row<ByteBuffer, ByteBuffer>> rowIter, SliceQuery sliceQuery) { + this.rows = Iterators.filter(rowIter, new KeyIterationPredicate()); + this.sliceQuery = sliceQuery; + } + + @Override + public RecordIterator<Entry> getEntries() { + ensureOpen(); + + if (sliceQuery == null) + throw new IllegalStateException("getEntries() requires SliceQuery to be set."); + + return new RecordIterator<Entry>() { + private final Iterator<Entry> columns = + CassandraHelper.makeEntryIterator(currentRow.getColumns(), + entryGetter, + sliceQuery.getSliceEnd(),sliceQuery.getLimit()); + + @Override + public boolean hasNext() { + ensureOpen(); + return columns.hasNext(); + } + + @Override + public Entry next() { + ensureOpen(); + return columns.next(); + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public boolean hasNext() { + ensureOpen(); + return rows.hasNext(); + } + + @Override + public StaticBuffer next() { + ensureOpen(); + + currentRow = rows.next(); + return StaticArrayBuffer.of(currentRow.getKey()); + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private void ensureOpen() { + if (isClosed) + throw new IllegalStateException("Iterator has been closed."); + } + } +} diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java new file mode 100644 index 0000000..f73ec15 --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreManager.java @@ -0,0 +1,672 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.astyanax; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import com.netflix.astyanax.AstyanaxContext; +import com.netflix.astyanax.Cluster; +import com.netflix.astyanax.ColumnListMutation; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.MutationBatch; +import com.netflix.astyanax.connectionpool.Host; +import com.netflix.astyanax.connectionpool.NodeDiscoveryType; +import com.netflix.astyanax.connectionpool.RetryBackoffStrategy; +import com.netflix.astyanax.connectionpool.SSLConnectionContext; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl; +import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType; +import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor; +import com.netflix.astyanax.connectionpool.impl.ExponentialRetryBackoffStrategy; +import com.netflix.astyanax.connectionpool.impl.SimpleAuthenticationCredentials; +import com.netflix.astyanax.ddl.ColumnFamilyDefinition; +import com.netflix.astyanax.ddl.KeyspaceDefinition; +import com.netflix.astyanax.impl.AstyanaxConfigurationImpl; +import com.netflix.astyanax.model.ColumnFamily; +import com.netflix.astyanax.retry.RetryPolicy; +import com.netflix.astyanax.thrift.ThriftFamilyFactory; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.Entry; +import com.thinkaurelius.titan.diskstorage.EntryMetaData; +import com.thinkaurelius.titan.diskstorage.PermanentBackendException; +import com.thinkaurelius.titan.diskstorage.StaticBuffer; +import com.thinkaurelius.titan.diskstorage.StoreMetaData; +import com.thinkaurelius.titan.diskstorage.TemporaryBackendException; +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; +import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx; + +@PreInitializeConfigOptions +public class AstyanaxStoreManager extends AbstractCassandraStoreManager { + + private static final Logger log = LoggerFactory.getLogger(AstyanaxStoreManager.class); + + //################### ASTYANAX SPECIFIC CONFIGURATION OPTIONS ###################### + + public static final ConfigNamespace ASTYANAX_NS = + new ConfigNamespace(CASSANDRA_NS, "astyanax", "Astyanax-specific Cassandra options"); + + /** + * Default name for the Cassandra cluster + * <p/> + */ + public static final ConfigOption<String> CLUSTER_NAME = + new ConfigOption<String>(ASTYANAX_NS, "cluster-name", + "Default name for the Cassandra cluster", + ConfigOption.Type.MASKABLE, "Titan Cluster"); + + /** + * Maximum pooled connections per host. + * <p/> + */ + public static final ConfigOption<Integer> MAX_CONNECTIONS_PER_HOST = + new ConfigOption<Integer>(ASTYANAX_NS, "max-connections-per-host", + "Maximum pooled connections per host", + ConfigOption.Type.MASKABLE, 32); + + /** + * Maximum open connections allowed in the pool (counting all hosts). + * <p/> + */ + public static final ConfigOption<Integer> MAX_CONNECTIONS = + new ConfigOption<Integer>(ASTYANAX_NS, "max-connections", + "Maximum open connections allowed in the pool (counting all hosts)", + ConfigOption.Type.MASKABLE, -1); + + /** + * Maximum number of operations allowed per connection before the connection is closed. + * <p/> + */ + public static final ConfigOption<Integer> MAX_OPERATIONS_PER_CONNECTION = + new ConfigOption<Integer>(ASTYANAX_NS, "max-operations-per-connection", + "Maximum number of operations allowed per connection before the connection is closed", + ConfigOption.Type.MASKABLE, 100 * 1000); + + /** + * Maximum pooled "cluster" connections per host. + * <p/> + * These connections are mostly idle and only used for DDL operations + * (like creating keyspaces). Titan doesn't need many of these connections + * in ordinary operation. + */ + public static final ConfigOption<Integer> MAX_CLUSTER_CONNECTIONS_PER_HOST = + new ConfigOption<Integer>(ASTYANAX_NS, "max-cluster-connections-per-host", + "Maximum pooled \"cluster\" connections per host", + ConfigOption.Type.MASKABLE, 3); + + /** + * How Astyanax discovers Cassandra cluster nodes. This must be one of the + * values of the Astyanax NodeDiscoveryType enum. + * <p/> + */ + public static final ConfigOption<String> NODE_DISCOVERY_TYPE = + new ConfigOption<String>(ASTYANAX_NS, "node-discovery-type", + "How Astyanax discovers Cassandra cluster nodes", + ConfigOption.Type.MASKABLE, "RING_DESCRIBE"); + + /** + * Astyanax specific host supplier useful only when discovery type set to DISCOVERY_SERVICE or TOKEN_AWARE. + * Excepts fully qualified class name which extends google.common.base.Supplier<List<Host>>. + */ + public static final ConfigOption<String> HOST_SUPPLIER = + new ConfigOption<String>(ASTYANAX_NS, "host-supplier", + "Host supplier to use when discovery type is set to DISCOVERY_SERVICE or TOKEN_AWARE", + ConfigOption.Type.MASKABLE, String.class); + + /** + * Astyanax's connection pooler implementation. This must be one of the + * values of the Astyanax ConnectionPoolType enum. + * <p/> + */ + public static final ConfigOption<String> CONNECTION_POOL_TYPE = + new ConfigOption<String>(ASTYANAX_NS, "connection-pool-type", + "Astyanax's connection pooler implementation", + ConfigOption.Type.MASKABLE, "TOKEN_AWARE"); + + /** + * In Astyanax, RetryPolicy and RetryBackoffStrategy sound and look similar + * but are used for distinct purposes. RetryPolicy is for retrying failed + * operations. RetryBackoffStrategy is for retrying attempts to talk to + * uncommunicative hosts. This config option controls RetryPolicy. + */ + public static final ConfigOption<String> RETRY_POLICY = + new ConfigOption<String>(ASTYANAX_NS, "retry-policy", + "Astyanax's retry policy implementation with configuration parameters", + ConfigOption.Type.MASKABLE, "com.netflix.astyanax.retry.BoundedExponentialBackoff,100,25000,8"); + + /** + * If non-null, this must be the fully-qualified classname (i.e. the + * complete package name, a dot, and then the class name) of an + * implementation of Astyanax's RetryBackoffStrategy interface. This string + * may be followed by a sequence of integers, separated from the full + * classname and from each other by commas; in this case, the integers are + * cast to native Java ints and passed to the class constructor as + * arguments. Here's an example setting that would instantiate an Astyanax + * FixedRetryBackoffStrategy with an delay interval of 1s and suspend time + * of 5s: + * <p/> + * <code> + * com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy,1000,5000 + * </code> + * <p/> + * If null, then Astyanax uses its default strategy, which is an + * ExponentialRetryBackoffStrategy instance. The instance parameters take + * Astyanax's built-in default values, which can be overridden via the + * following config keys: + * <ul> + * <li>{@link #RETRY_DELAY_SLICE}</li> + * <li>{@link #RETRY_MAX_DELAY_SLICE}</li> + * <li>{@link #RETRY_SUSPEND_WINDOW}</li> + * </ul> + * <p/> + * In Astyanax, RetryPolicy and RetryBackoffStrategy sound and look similar + * but are used for distinct purposes. RetryPolicy is for retrying failed + * operations. RetryBackoffStrategy is for retrying attempts to talk to + * uncommunicative hosts. This config option controls RetryBackoffStrategy. + */ + public static final ConfigOption<String> RETRY_BACKOFF_STRATEGY = + new ConfigOption<String>(ASTYANAX_NS, "retry-backoff-strategy", + "Astyanax's retry backoff strategy with configuration parameters", + ConfigOption.Type.MASKABLE, "com.netflix.astyanax.connectionpool.impl.FixedRetryBackoffStrategy,1000,5000"); + + /** + * Controls the retryDelaySlice parameter on Astyanax + * ConnectionPoolConfigurationImpl objects, which is in turn used by + * ExponentialRetryBackoffStrategy. See the code for + * {@link ConnectionPoolConfigurationImpl}, + * {@link ExponentialRetryBackoffStrategy}, and the javadoc for + * {@link #RETRY_BACKOFF_STRATEGY} for more information. + * <p/> + * This parameter is not meaningful for and has no effect on + * FixedRetryBackoffStrategy. + */ + public static final ConfigOption<Integer> RETRY_DELAY_SLICE = + new ConfigOption<Integer>(ASTYANAX_NS, "retry-delay-slice", + "Astyanax's connection pool \"retryDelaySlice\" parameter", + ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_DELAY_SLICE); + /** + * Controls the retryMaxDelaySlice parameter on Astyanax + * ConnectionPoolConfigurationImpl objects, which is in turn used by + * ExponentialRetryBackoffStrategy. See the code for + * {@link ConnectionPoolConfigurationImpl}, + * {@link ExponentialRetryBackoffStrategy}, and the javadoc for + * {@link #RETRY_BACKOFF_STRATEGY} for more information. + * <p/> + * This parameter is not meaningful for and has no effect on + * FixedRetryBackoffStrategy. + */ + public static final ConfigOption<Integer> RETRY_MAX_DELAY_SLICE = + new ConfigOption<Integer>(ASTYANAX_NS, "retry-max-delay-slice", + "Astyanax's connection pool \"retryMaxDelaySlice\" parameter", + ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_MAX_DELAY_SLICE); + + /** + * Controls the retrySuspendWindow parameter on Astyanax + * ConnectionPoolConfigurationImpl objects, which is in turn used by + * ExponentialRetryBackoffStrategy. See the code for + * {@link ConnectionPoolConfigurationImpl}, + * {@link ExponentialRetryBackoffStrategy}, and the javadoc for + * {@link #RETRY_BACKOFF_STRATEGY} for more information. + * <p/> + * This parameter is not meaningful for and has no effect on + * FixedRetryBackoffStrategy. + */ + public static final ConfigOption<Integer> RETRY_SUSPEND_WINDOW = + new ConfigOption<Integer>(ASTYANAX_NS, "retry-suspend-window", + "Astyanax's connection pool \"retryMaxDelaySlice\" parameter", + ConfigOption.Type.MASKABLE, ConnectionPoolConfigurationImpl.DEFAULT_RETRY_SUSPEND_WINDOW); + + /** + * Controls the frame size of thrift sockets created by Astyanax. + */ + public static final ConfigOption<Integer> THRIFT_FRAME_SIZE = + new ConfigOption<Integer>(ASTYANAX_NS, "frame-size", + "The thrift frame size in mega bytes", ConfigOption.Type.MASKABLE, 15); + + public static final ConfigOption<String> LOCAL_DATACENTER = + new ConfigOption<String>(ASTYANAX_NS, "local-datacenter", + "The name of the local or closest Cassandra datacenter. When set and not whitespace, " + + "this value will be passed into ConnectionPoolConfigurationImpl.setLocalDatacenter. " + + "When unset or set to whitespace, setLocalDatacenter will not be invoked.", + /* It's between either LOCAL or MASKABLE. MASKABLE could be useful for cases where + all the Titan instances are closest to the same Cassandra DC. */ + ConfigOption.Type.MASKABLE, String.class); + + private final String clusterName; + + private final AstyanaxContext<Keyspace> keyspaceContext; + private final AstyanaxContext<Cluster> clusterContext; + + private final RetryPolicy retryPolicy; + + private final int retryDelaySlice; + private final int retryMaxDelaySlice; + private final int retrySuspendWindow; + private final RetryBackoffStrategy retryBackoffStrategy; + + private final String localDatacenter; + + private final Map<String, AstyanaxKeyColumnValueStore> openStores; + + public AstyanaxStoreManager(Configuration config) throws BackendException { + super(config); + + this.clusterName = config.get(CLUSTER_NAME); + + retryDelaySlice = config.get(RETRY_DELAY_SLICE); + retryMaxDelaySlice = config.get(RETRY_MAX_DELAY_SLICE); + retrySuspendWindow = config.get(RETRY_SUSPEND_WINDOW); + retryBackoffStrategy = getRetryBackoffStrategy(config.get(RETRY_BACKOFF_STRATEGY)); + retryPolicy = getRetryPolicy(config.get(RETRY_POLICY)); + + localDatacenter = config.has(LOCAL_DATACENTER) ? + config.get(LOCAL_DATACENTER) : ""; + + final int maxConnsPerHost = config.get(MAX_CONNECTIONS_PER_HOST); + + final int maxClusterConnsPerHost = config.get(MAX_CLUSTER_CONNECTIONS_PER_HOST); + + this.clusterContext = createCluster(getContextBuilder(config, maxClusterConnsPerHost, "Cluster")); + + ensureKeyspaceExists(clusterContext.getClient()); + + this.keyspaceContext = getContextBuilder(config, maxConnsPerHost, "Keyspace").buildKeyspace(ThriftFamilyFactory.getInstance()); + this.keyspaceContext.start(); + + openStores = new HashMap<String, AstyanaxKeyColumnValueStore>(8); + } + + @Override + public Deployment getDeployment() { + return Deployment.REMOTE; // TODO + } + + @Override + @SuppressWarnings("unchecked") + public IPartitioner getCassandraPartitioner() throws BackendException { + Cluster cl = clusterContext.getClient(); + try { + return FBUtilities.newPartitioner(cl.describePartitioner()); + } catch (ConnectionException e) { + throw new TemporaryBackendException(e); + } catch (ConfigurationException e) { + throw new PermanentBackendException(e); + } + } + + @Override + public String toString() { + return "astyanax" + super.toString(); + } + + @Override + public void close() { + // Shutdown the Astyanax contexts + openStores.clear(); + keyspaceContext.shutdown(); + clusterContext.shutdown(); + } + + @Override + public synchronized AstyanaxKeyColumnValueStore openDatabase(String name, StoreMetaData.Container metaData) throws BackendException { + if (openStores.containsKey(name)) return openStores.get(name); + else { + ensureColumnFamilyExists(name); + AstyanaxKeyColumnValueStore store = new AstyanaxKeyColumnValueStore(name, keyspaceContext.getClient(), this, retryPolicy); + openStores.put(name, store); + return store; + } + } + + @Override + public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> batch, StoreTransaction txh) throws BackendException { + MutationBatch m = keyspaceContext.getClient().prepareMutationBatch().withAtomicBatch(atomicBatch) + .setConsistencyLevel(getTx(txh).getWriteConsistencyLevel().getAstyanax()) + .withRetryPolicy(retryPolicy.duplicate()); + + final MaskedTimestamp commitTime = new MaskedTimestamp(txh); + + for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> batchentry : batch.entrySet()) { + String storeName = batchentry.getKey(); + Preconditions.checkArgument(openStores.containsKey(storeName), "Store cannot be found: " + storeName); + + ColumnFamily<ByteBuffer, ByteBuffer> columnFamily = openStores.get(storeName).getColumnFamily(); + + Map<StaticBuffer, KCVMutation> mutations = batchentry.getValue(); + for (Map.Entry<StaticBuffer, KCVMutation> ent : mutations.entrySet()) { + // The CLMs for additions and deletions are separated because + // Astyanax's operation timestamp cannot be set on a per-delete + // or per-addition basis. + KCVMutation titanMutation = ent.getValue(); + ByteBuffer key = ent.getKey().asByteBuffer(); + + if (titanMutation.hasDeletions()) { + ColumnListMutation<ByteBuffer> dels = m.withRow(columnFamily, key); + dels.setTimestamp(commitTime.getDeletionTime(times)); + + for (StaticBuffer b : titanMutation.getDeletions()) + dels.deleteColumn(b.as(StaticBuffer.BB_FACTORY)); + } + + if (titanMutation.hasAdditions()) { + ColumnListMutation<ByteBuffer> upds = m.withRow(columnFamily, key); + upds.setTimestamp(commitTime.getAdditionTime(times)); + + for (Entry e : titanMutation.getAdditions()) { + Integer ttl = (Integer) e.getMetaData().get(EntryMetaData.TTL); + + if (null != ttl && ttl > 0) { + upds.putColumn(e.getColumnAs(StaticBuffer.BB_FACTORY), e.getValueAs(StaticBuffer.BB_FACTORY), ttl); + } else { + upds.putColumn(e.getColumnAs(StaticBuffer.BB_FACTORY), e.getValueAs(StaticBuffer.BB_FACTORY)); + } + } + } + } + } + + try { + m.execute(); + } catch (ConnectionException e) { + throw new TemporaryBackendException(e); + } + + sleepAfterWrite(txh, commitTime); + } + + @Override + public List<KeyRange> getLocalKeyPartition() throws BackendException { + throw new UnsupportedOperationException(); + } + + @Override + public void clearStorage() throws BackendException { + try { + Cluster cluster = clusterContext.getClient(); + + Keyspace ks = cluster.getKeyspace(keySpaceName); + + // Not a big deal if Keyspace doesn't not exist (dropped manually by user or tests). + // This is called on per test setup basis to make sure that previous test cleaned + // everything up, so first invocation would always fail as Keyspace doesn't yet exist. + if (ks == null) + return; + + for (ColumnFamilyDefinition cf : cluster.describeKeyspace(keySpaceName).getColumnFamilyList()) { + ks.truncateColumnFamily(new ColumnFamily<Object, Object>(cf.getName(), null, null)); + } + } catch (ConnectionException e) { + throw new PermanentBackendException(e); + } + } + + private void ensureColumnFamilyExists(String name) throws BackendException { + ensureColumnFamilyExists(name, "org.apache.cassandra.db.marshal.BytesType"); + } + + private void ensureColumnFamilyExists(String name, String comparator) throws BackendException { + Cluster cl = clusterContext.getClient(); + try { + KeyspaceDefinition ksDef = cl.describeKeyspace(keySpaceName); + boolean found = false; + if (null != ksDef) { + for (ColumnFamilyDefinition cfDef : ksDef.getColumnFamilyList()) { + found |= cfDef.getName().equals(name); + } + } + if (!found) { + ColumnFamilyDefinition cfDef = + cl.makeColumnFamilyDefinition() + .setName(name) + .setKeyspace(keySpaceName) + .setComparatorType(comparator); + + ImmutableMap.Builder<String, String> compressionOptions = new ImmutableMap.Builder<String, String>(); + + if (compressionEnabled) { + compressionOptions.put("sstable_compression", compressionClass) + .put("chunk_length_kb", Integer.toString(compressionChunkSizeKB)); + } + + cl.addColumnFamily(cfDef.setCompressionOptions(compressionOptions.build())); + } + } catch (ConnectionException e) { + throw new TemporaryBackendException(e); + } + } + + private static AstyanaxContext<Cluster> createCluster(AstyanaxContext.Builder cb) { + AstyanaxContext<Cluster> clusterCtx = cb.buildCluster(ThriftFamilyFactory.getInstance()); + clusterCtx.start(); + + return clusterCtx; + } + + private AstyanaxContext.Builder getContextBuilder(Configuration config, int maxConnsPerHost, String usedFor) { + + final ConnectionPoolType poolType = ConnectionPoolType.valueOf(config.get(CONNECTION_POOL_TYPE)); + + final NodeDiscoveryType discType = NodeDiscoveryType.valueOf(config.get(NODE_DISCOVERY_TYPE)); + + final int maxConnections = config.get(MAX_CONNECTIONS); + + final int maxOperationsPerConnection = config.get(MAX_OPERATIONS_PER_CONNECTION); + + final int connectionTimeout = (int) connectionTimeoutMS.toMillis(); + + ConnectionPoolConfigurationImpl cpool = + new ConnectionPoolConfigurationImpl(usedFor + "TitanConnectionPool") + .setPort(port) + .setMaxOperationsPerConnection(maxOperationsPerConnection) + .setMaxConnsPerHost(maxConnsPerHost) + .setRetryDelaySlice(retryDelaySlice) + .setRetryMaxDelaySlice(retryMaxDelaySlice) + .setRetrySuspendWindow(retrySuspendWindow) + .setSocketTimeout(connectionTimeout) + .setConnectTimeout(connectionTimeout) + .setSeeds(StringUtils.join(hostnames, ",")); + + if (null != retryBackoffStrategy) { + cpool.setRetryBackoffStrategy(retryBackoffStrategy); + log.debug("Custom RetryBackoffStrategy {}", cpool.getRetryBackoffStrategy()); + } else { + log.debug("Default RetryBackoffStrategy {}", cpool.getRetryBackoffStrategy()); + } + + if (StringUtils.isNotBlank(localDatacenter)) { + cpool.setLocalDatacenter(localDatacenter); + log.debug("Set local datacenter: {}", cpool.getLocalDatacenter()); + } + + AstyanaxConfigurationImpl aconf = + new AstyanaxConfigurationImpl() + .setConnectionPoolType(poolType) + .setDiscoveryType(discType) + .setTargetCassandraVersion("1.2") + .setMaxThriftSize(thriftFrameSizeBytes); + + if (0 < maxConnections) { + cpool.setMaxConns(maxConnections); + } + + if (hasAuthentication()) { + cpool.setAuthenticationCredentials(new SimpleAuthenticationCredentials(username, password)); + } + + if (config.get(SSL_ENABLED)) { + cpool.setSSLConnectionContext(new SSLConnectionContext(config.get(SSL_TRUSTSTORE_LOCATION), config.get(SSL_TRUSTSTORE_PASSWORD))); + } + + AstyanaxContext.Builder ctxBuilder = new AstyanaxContext.Builder(); + + // Standard context builder options + ctxBuilder + .forCluster(clusterName) + .forKeyspace(keySpaceName) + .withAstyanaxConfiguration(aconf) + .withConnectionPoolConfiguration(cpool) + .withConnectionPoolMonitor(new CountingConnectionPoolMonitor()); + + // Conditional context builder option: host supplier + if (config.has(HOST_SUPPLIER)) { + String hostSupplier = config.get(HOST_SUPPLIER); + Supplier<List<Host>> supplier = null; + if (hostSupplier != null) { + try { + supplier = (Supplier<List<Host>>) Class.forName(hostSupplier).newInstance(); + ctxBuilder.withHostSupplier(supplier); + } catch (Exception e) { + log.warn("Problem with host supplier class " + hostSupplier + ", going to use default.", e); + } + } + } + + return ctxBuilder; + } + + private void ensureKeyspaceExists(Cluster cl) throws BackendException { + KeyspaceDefinition ksDef; + + try { + ksDef = cl.describeKeyspace(keySpaceName); + + if (null != ksDef && ksDef.getName().equals(keySpaceName)) { + log.debug("Found keyspace {}", keySpaceName); + return; + } + } catch (ConnectionException e) { + log.debug("Failed to describe keyspace {}", keySpaceName); + } + + log.debug("Creating keyspace {}...", keySpaceName); + try { + ksDef = cl.makeKeyspaceDefinition() + .setName(keySpaceName) + .setStrategyClass(storageConfig.get(REPLICATION_STRATEGY)) + .setStrategyOptions(strategyOptions); + cl.addKeyspace(ksDef); + + log.debug("Created keyspace {}", keySpaceName); + } catch (ConnectionException e) { + log.debug("Failed to create keyspace {}", keySpaceName); + throw new TemporaryBackendException(e); + } + } + + private static RetryBackoffStrategy getRetryBackoffStrategy(String desc) throws PermanentBackendException { + if (null == desc) + return null; + + String[] tokens = desc.split(","); + String policyClassName = tokens[0]; + int argCount = tokens.length - 1; + Integer[] args = new Integer[argCount]; + + for (int i = 1; i < tokens.length; i++) { + args[i - 1] = Integer.valueOf(tokens[i]); + } + + try { + RetryBackoffStrategy rbs = instantiate(policyClassName, args, desc); + log.debug("Instantiated RetryBackoffStrategy object {} from config string \"{}\"", rbs, desc); + return rbs; + } catch (Exception e) { + throw new PermanentBackendException("Failed to instantiate Astyanax RetryBackoffStrategy implementation", e); + } + } + + private static RetryPolicy getRetryPolicy(String serializedRetryPolicy) throws BackendException { + String[] tokens = serializedRetryPolicy.split(","); + String policyClassName = tokens[0]; + int argCount = tokens.length - 1; + Integer[] args = new Integer[argCount]; + for (int i = 1; i < tokens.length; i++) { + args[i - 1] = Integer.valueOf(tokens[i]); + } + + try { + RetryPolicy rp = instantiate(policyClassName, args, serializedRetryPolicy); + log.debug("Instantiated RetryPolicy object {} from config string \"{}\"", rp, serializedRetryPolicy); + return rp; + } catch (Exception e) { + throw new PermanentBackendException("Failed to instantiate Astyanax Retry Policy class", e); + } + } + + @SuppressWarnings("unchecked") + private static <V> V instantiate(String policyClassName, Integer[] args, String raw) throws Exception { + for (Constructor<?> con : Class.forName(policyClassName).getConstructors()) { + Class<?>[] parameterTypes = con.getParameterTypes(); + + // match constructor by number of arguments first + if (args.length != parameterTypes.length) + continue; + + // check if the constructor parameter types are compatible with argument types (which are integer) + // note that we allow long.class arguments too because integer is cast to long by runtime. + boolean intsOrLongs = true; + for (Class<?> pc : parameterTypes) { + if (!pc.equals(int.class) && !pc.equals(long.class)) { + intsOrLongs = false; + break; + } + } + + // we found a constructor with required number of parameters but times didn't match, let's carry on + if (!intsOrLongs) + continue; + + if (log.isDebugEnabled()) + log.debug("About to instantiate class {} with {} arguments", con.toString(), args.length); + + return (V) con.newInstance(args); + } + + throw new Exception("Failed to identify a class matching the Astyanax Retry Policy config string \"" + raw + "\""); + } + + @Override + public Map<String, String> getCompressionOptions(String cf) throws BackendException { + try { + Keyspace k = keyspaceContext.getClient(); + + KeyspaceDefinition kdef = k.describeKeyspace(); + + if (null == kdef) { + throw new PermanentBackendException("Keyspace " + k.getKeyspaceName() + " is undefined"); + } + + ColumnFamilyDefinition cfdef = kdef.getColumnFamily(cf); + + if (null == cfdef) { + throw new PermanentBackendException("Column family " + cf + " is undefined"); + } + + return cfdef.getCompressionOptions(); + } catch (ConnectionException e) { + throw new PermanentBackendException(e); + } + } +} + + diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedKeyColumnValueStore.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedKeyColumnValueStore.java new file mode 100644 index 0000000..67eec14 --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedKeyColumnValueStore.java @@ -0,0 +1,527 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.embedded; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider; +import com.thinkaurelius.titan.diskstorage.*; +import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; +import com.thinkaurelius.titan.diskstorage.util.RecordIterator; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.composites.CellNames; +import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.filter.IDiskAtomFilter; +import org.apache.cassandra.db.filter.SliceQueryFilter; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.IsBootstrappingException; +import org.apache.cassandra.exceptions.RequestTimeoutException; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.thrift.SlicePredicate; +import org.apache.cassandra.thrift.SliceRange; +import org.apache.cassandra.thrift.ThriftValidation; +import org.apache.commons.lang.ArrayUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx; + +public class CassandraEmbeddedKeyColumnValueStore implements KeyColumnValueStore { + + private static final Logger log = LoggerFactory.getLogger(CassandraEmbeddedKeyColumnValueStore.class); + + private final String keyspace; + private final String columnFamily; + private final CassandraEmbeddedStoreManager storeManager; + private final TimestampProvider times; + private final CassandraEmbeddedGetter entryGetter; + + public CassandraEmbeddedKeyColumnValueStore( + String keyspace, + String columnFamily, + CassandraEmbeddedStoreManager storeManager) throws RuntimeException { + this.keyspace = keyspace; + this.columnFamily = columnFamily; + this.storeManager = storeManager; + this.times = this.storeManager.getTimestampProvider(); + entryGetter = new CassandraEmbeddedGetter(storeManager.getMetaDataSchema(columnFamily),times); + } + + @Override + public void close() throws BackendException { + } + + @Override + public void acquireLock(StaticBuffer key, StaticBuffer column, + StaticBuffer expectedValue, StoreTransaction txh) throws BackendException { + throw new UnsupportedOperationException(); + } + + @Override + public KeyIterator getKeys(KeyRangeQuery keyRangeQuery, StoreTransaction txh) throws BackendException { + IPartitioner partitioner = StorageService.getPartitioner(); + + // see rant about this in Astyanax implementation + if (partitioner instanceof RandomPartitioner || partitioner instanceof Murmur3Partitioner) + throw new PermanentBackendException("This operation is only supported when byte-ordered partitioner is used."); + + return new RowIterator(keyRangeQuery, storeManager.getPageSize(), txh); + } + + @Override + public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException { + return new RowIterator(getMinimumToken(), getMaximumToken(), query, storeManager.getPageSize(), txh); + } + + + /** + * Create a RangeSliceCommand and run it against the StorageProxy. + * <p> + * To match the behavior of the standard Cassandra thrift API endpoint, the + * {@code nowMillis} argument should be the number of milliseconds since the + * UNIX Epoch (e.g. System.currentTimeMillis() or equivalent obtained + * through a {@link TimestampProvider}). This is per + * {@link org.apache.cassandra.thrift.CassandraServer#get_range_slices(ColumnParent, SlicePredicate, KeyRange, ConsistencyLevel)}, + * which passes the server's System.currentTimeMillis() to the + * {@code RangeSliceCommand} constructor. + */ + private List<Row> getKeySlice(Token start, + Token end, + @Nullable SliceQuery sliceQuery, + int pageSize, + long nowMillis) throws BackendException { + IPartitioner partitioner = StorageService.getPartitioner(); + + SliceRange columnSlice = new SliceRange(); + if (sliceQuery == null) { + columnSlice.setStart(ArrayUtils.EMPTY_BYTE_ARRAY) + .setFinish(ArrayUtils.EMPTY_BYTE_ARRAY) + .setCount(5); + } else { + columnSlice.setStart(sliceQuery.getSliceStart().asByteBuffer()) + .setFinish(sliceQuery.getSliceEnd().asByteBuffer()) + .setCount(sliceQuery.hasLimit() ? sliceQuery.getLimit() : Integer.MAX_VALUE); + } + /* Note: we need to fetch columns for each row as well to remove "range ghosts" */ + SlicePredicate predicate = new SlicePredicate().setSlice_range(columnSlice); + + RowPosition startPosition = start.minKeyBound(partitioner); + RowPosition endPosition = end.minKeyBound(partitioner); + + List<Row> rows; + + try { + CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, columnFamily); + IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, cfm, null); + + RangeSliceCommand cmd = new RangeSliceCommand(keyspace, columnFamily, nowMillis, filter, new Bounds<RowPosition>(startPosition, endPosition), pageSize); + + rows = StorageProxy.getRangeSlice(cmd, ConsistencyLevel.QUORUM); + } catch (Exception e) { + throw new PermanentBackendException(e); + } + + return rows; + } + + @Override + public String getName() { + return columnFamily; + } + + @Override + public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException { + + /** + * This timestamp mimics the timestamp used by + * {@link org.apache.cassandra.thrift.CassandraServer#get(ByteBuffer,ColumnPath,ConsistencyLevel)}. + * + * That method passes the server's System.currentTimeMillis() to + * {@link ReadCommand#create(String, ByteBuffer, String, long, IDiskAtomFilter)}. + * {@code create(...)} in turn passes that timestamp to the SliceFromReadCommand constructor. + */ + final long nowMillis = times.getTime().toEpochMilli(); + Composite startComposite = CellNames.simpleDense(query.getSliceStart().asByteBuffer()); + Composite endComposite = CellNames.simpleDense(query.getSliceEnd().asByteBuffer()); + SliceQueryFilter sqf = new SliceQueryFilter(startComposite, endComposite, + false, query.getLimit() + (query.hasLimit()?1:0)); + ReadCommand sliceCmd = new SliceFromReadCommand(keyspace, query.getKey().asByteBuffer(), columnFamily, nowMillis, sqf); + + List<Row> slice = read(sliceCmd, getTx(txh).getReadConsistencyLevel().getDB()); + + if (null == slice || 0 == slice.size()) + return EntryList.EMPTY_LIST; + + int sliceSize = slice.size(); + if (1 < sliceSize) + throw new PermanentBackendException("Received " + sliceSize + " rows for single key"); + + Row r = slice.get(0); + + if (null == r) { + log.warn("Null Row object retrieved from Cassandra StorageProxy"); + return EntryList.EMPTY_LIST; + } + + ColumnFamily cf = r.cf; + + if (null == cf) { + log.debug("null ColumnFamily (\"{}\")", columnFamily); + return EntryList.EMPTY_LIST; + } + + if (cf.isMarkedForDelete()) + return EntryList.EMPTY_LIST; + + return CassandraHelper.makeEntryList( + Iterables.filter(cf.getSortedColumns(), new FilterDeletedColumns(nowMillis)), + entryGetter, + query.getSliceEnd(), + query.getLimit()); + + } + + private class FilterDeletedColumns implements Predicate<Cell> { + + private final long tsMillis; + private final int tsSeconds; + + private FilterDeletedColumns(long tsMillis) { + this.tsMillis = tsMillis; + this.tsSeconds = (int)(this.tsMillis / 1000L); + } + + @Override + public boolean apply(Cell input) { + if (!input.isLive(tsMillis)) + return false; + + // Don't do this. getTimeToLive() is a duration divorced from any particular clock. + // For instance, if TTL=10 seconds, getTimeToLive() will have value 10 (not 10 + epoch seconds), and + // this will always return false. + //if (input instanceof ExpiringCell) + // return tsSeconds < ((ExpiringCell)input).getTimeToLive(); + + return true; + } + } + + @Override + public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException { + throw new UnsupportedOperationException(); + } + + @Override + public void mutate(StaticBuffer key, List<Entry> additions, + List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException { + Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new + KCVMutation(additions, deletions)); + mutateMany(mutations, txh); + } + + + public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, + StoreTransaction txh) throws BackendException { + storeManager.mutateMany(ImmutableMap.of(columnFamily, mutations), txh); + } + + private static List<Row> read(ReadCommand cmd, org.apache.cassandra.db.ConsistencyLevel clvl) throws BackendException { + ArrayList<ReadCommand> cmdHolder = new ArrayList<ReadCommand>(1); + cmdHolder.add(cmd); + return read(cmdHolder, clvl); + } + + private static List<Row> read(List<ReadCommand> cmds, org.apache.cassandra.db.ConsistencyLevel clvl) throws BackendException { + try { + return StorageProxy.read(cmds, clvl); + } catch (UnavailableException e) { + throw new TemporaryBackendException(e); + } catch (RequestTimeoutException e) { + throw new PermanentBackendException(e); + } catch (IsBootstrappingException e) { + throw new TemporaryBackendException(e); + } catch (InvalidRequestException e) { + throw new PermanentBackendException(e); + } + } + + private static class CassandraEmbeddedGetter implements StaticArrayEntry.GetColVal<Cell,ByteBuffer> { + + private final EntryMetaData[] schema; + private final TimestampProvider times; + + private CassandraEmbeddedGetter(EntryMetaData[] schema, TimestampProvider times) { + this.schema = schema; + this.times = times; + } + + @Override + public ByteBuffer getColumn(Cell element) { + return org.apache.cassandra.utils.ByteBufferUtil.clone(element.name().toByteBuffer()); + } + + @Override + public ByteBuffer getValue(Cell element) { + return org.apache.cassandra.utils.ByteBufferUtil.clone(element.value()); + } + + @Override + public EntryMetaData[] getMetaSchema(Cell element) { + return schema; + } + + @Override + public Object getMetaData(Cell element, EntryMetaData meta) { + switch (meta) { + case TIMESTAMP: + return element.timestamp(); + case TTL: + return ((element instanceof ExpiringCell) + ? ((ExpiringCell) element).getTimeToLive() + : 0); + default: + throw new UnsupportedOperationException("Unsupported meta data: " + meta); + } + } + } + + private class RowIterator implements KeyIterator { + private final Token maximumToken; + private final SliceQuery sliceQuery; + private final StoreTransaction txh; + + /** + * This RowIterator will use this timestamp for its entire lifetime, + * even if the iterator runs more than one distinct slice query while + * paging. <b>This field must be in units of milliseconds since + * the UNIX Epoch</b>. + * <p> + * This timestamp is passed to three methods/constructors: + * <ul> + * <li>{@link org.apache.cassandra.db.Column#isMarkedForDelete(long now)}</li> + * <li>{@link org.apache.cassandra.db.ColumnFamily#hasOnlyTombstones(long)}</li> + * <li> + * the {@link RangeSliceCommand} constructor via the last argument + * to {@link CassandraEmbeddedKeyColumnValueStore#getKeySlice(Token, Token, SliceQuery, int, long)} + * </li> + * </ul> + * The second list entry just calls the first and almost doesn't deserve + * a mention at present, but maybe the implementation will change in the future. + * <p> + * When this value needs to be compared to TTL seconds expressed in seconds, + * Cassandra internals do the conversion. + * Consider {@link ExpiringColumn#isMarkedForDelete(long)}, which is implemented, + * as of 2.0.6, by the following one-liner: + * <p> + * {@code return (int) (now / 1000) >= getLocalDeletionTime()} + * <p> + * The {@code now / 1000} does the conversion from milliseconds to seconds + * (the units of getLocalDeletionTime()). + */ + private final long nowMillis; + + private Iterator<Row> keys; + private ByteBuffer lastSeenKey = null; + private Row currentRow; + private int pageSize; + + private boolean isClosed; + + public RowIterator(KeyRangeQuery keyRangeQuery, int pageSize, StoreTransaction txh) throws BackendException { + this(StorageService.getPartitioner().getToken(keyRangeQuery.getKeyStart().asByteBuffer()), + StorageService.getPartitioner().getToken(keyRangeQuery.getKeyEnd().asByteBuffer()), + keyRangeQuery, + pageSize, + txh); + } + + public RowIterator(Token minimum, Token maximum, SliceQuery sliceQuery, int pageSize, StoreTransaction txh) throws BackendException { + this.pageSize = pageSize; + this.sliceQuery = sliceQuery; + this.maximumToken = maximum; + this.txh = txh; + this.nowMillis = times.getTime().toEpochMilli(); + this.keys = getRowsIterator(getKeySlice(minimum, maximum, sliceQuery, pageSize, nowMillis)); + } + + @Override + public boolean hasNext() { + try { + return hasNextInternal(); + } catch (BackendException e) { + throw new RuntimeException(e); + } + } + + @Override + public StaticBuffer next() { + ensureOpen(); + + if (!hasNext()) + throw new NoSuchElementException(); + + currentRow = keys.next(); + ByteBuffer currentKey = currentRow.key.getKey().duplicate(); + + try { + return StaticArrayBuffer.of(currentKey); + } finally { + lastSeenKey = currentKey; + } + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public RecordIterator<Entry> getEntries() { + ensureOpen(); + + if (sliceQuery == null) + throw new IllegalStateException("getEntries() requires SliceQuery to be set."); + + return new RecordIterator<Entry>() { + final Iterator<Entry> columns = CassandraHelper.makeEntryIterator( + Iterables.filter(currentRow.cf.getSortedColumns(), new FilterDeletedColumns(nowMillis)), + entryGetter, + sliceQuery.getSliceEnd(), + sliceQuery.getLimit()); + + //cfToEntries(currentRow.cf, sliceQuery).iterator(); + + @Override + public boolean hasNext() { + ensureOpen(); + return columns.hasNext(); + } + + @Override + public Entry next() { + ensureOpen(); + return columns.next(); + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + + } + + private final boolean hasNextInternal() throws BackendException { + ensureOpen(); + + if (keys == null) + return false; + + boolean hasNext = keys.hasNext(); + + if (!hasNext && lastSeenKey != null) { + Token lastSeenToken = StorageService.getPartitioner().getToken(lastSeenKey.duplicate()); + + // let's check if we reached key upper bound already so we can skip one useless call to Cassandra + if (maximumToken != getMinimumToken() && lastSeenToken.equals(maximumToken)) { + return false; + } + + List<Row> newKeys = getKeySlice(StorageService.getPartitioner().getToken(lastSeenKey), maximumToken, sliceQuery, pageSize, nowMillis); + + keys = getRowsIterator(newKeys, lastSeenKey); + hasNext = keys.hasNext(); + } + + return hasNext; + } + + private void ensureOpen() { + if (isClosed) + throw new IllegalStateException("Iterator has been closed."); + } + + private Iterator<Row> getRowsIterator(List<Row> rows) { + if (rows == null) + return null; + + return Iterators.filter(rows.iterator(), new Predicate<Row>() { + @Override + public boolean apply(@Nullable Row row) { + // The hasOnlyTombstones(x) call below ultimately calls Column.isMarkedForDelete(x) + return !(row == null || row.cf == null || row.cf.isMarkedForDelete() || row.cf.hasOnlyTombstones(nowMillis)); + } + }); + } + + private Iterator<Row> getRowsIterator(List<Row> rows, final ByteBuffer exceptKey) { + Iterator<Row> rowIterator = getRowsIterator(rows); + + if (rowIterator == null) + return null; + + return Iterators.filter(rowIterator, new Predicate<Row>() { + @Override + public boolean apply(@Nullable Row row) { + return row != null && !row.key.getKey().equals(exceptKey); + } + }); + } + } + + private static Token getMinimumToken() throws PermanentBackendException { + IPartitioner partitioner = StorageService.getPartitioner(); + + if (partitioner instanceof RandomPartitioner) { + return ((RandomPartitioner) partitioner).getMinimumToken(); + } else if (partitioner instanceof Murmur3Partitioner) { + return ((Murmur3Partitioner) partitioner).getMinimumToken(); + } else if (partitioner instanceof ByteOrderedPartitioner) { + //TODO: This makes the assumption that its an EdgeStore (i.e. 8 byte keys) + return new BytesToken(com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil.zeroByteBuffer(8)); + } else { + throw new PermanentBackendException("Unsupported partitioner: " + partitioner); + } + } + + private static Token getMaximumToken() throws PermanentBackendException { + IPartitioner partitioner = StorageService.getPartitioner(); + + if (partitioner instanceof RandomPartitioner) { + return new BigIntegerToken(RandomPartitioner.MAXIMUM); + } else if (partitioner instanceof Murmur3Partitioner) { + return new LongToken(Murmur3Partitioner.MAXIMUM); + } else if (partitioner instanceof ByteOrderedPartitioner) { + //TODO: This makes the assumption that its an EdgeStore (i.e. 8 byte keys) + return new BytesToken(com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil.oneByteBuffer(8)); + } else { + throw new PermanentBackendException("Unsupported partitioner: " + partitioner); + } + } +} diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedStoreManager.java new file mode 100644 index 0000000..cecc92f --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/CassandraEmbeddedStoreManager.java @@ -0,0 +1,399 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.embedded; + +import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.TimeoutException; + +import com.thinkaurelius.titan.diskstorage.*; +import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; +import com.thinkaurelius.titan.diskstorage.util.ByteBufferUtil; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; + +import org.apache.cassandra.cache.CachingOptions; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyType; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.SliceByNamesReadCommand; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.CellNames; +import org.apache.cassandra.db.filter.NamesQueryFilter; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.scheduler.IRequestScheduler; +import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.StorageService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager; +import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraDaemonWrapper; + +public class CassandraEmbeddedStoreManager extends AbstractCassandraStoreManager { + + private static final Logger log = LoggerFactory.getLogger(CassandraEmbeddedStoreManager.class); + + /** + * The default value for + * {@link GraphDatabaseConfiguration#STORAGE_CONF_FILE}. + */ + public static final String CASSANDRA_YAML_DEFAULT = "./conf/cassandra.yaml"; + + private final Map<String, CassandraEmbeddedKeyColumnValueStore> openStores; + + private final IRequestScheduler requestScheduler; + + public CassandraEmbeddedStoreManager(Configuration config) throws BackendException { + super(config); + + String cassandraConfig = CASSANDRA_YAML_DEFAULT; + if (config.has(GraphDatabaseConfiguration.STORAGE_CONF_FILE)) { + cassandraConfig = config.get(GraphDatabaseConfiguration.STORAGE_CONF_FILE); + } + + assert cassandraConfig != null && !cassandraConfig.isEmpty(); + + File ccf = new File(cassandraConfig); + + if (ccf.exists() && ccf.isAbsolute()) { + cassandraConfig = "file://" + cassandraConfig; + log.debug("Set cassandra config string \"{}\"", cassandraConfig); + } + + CassandraDaemonWrapper.start(cassandraConfig); + + this.openStores = new HashMap<String, CassandraEmbeddedKeyColumnValueStore>(8); + this.requestScheduler = DatabaseDescriptor.getRequestScheduler(); + } + + @Override + public Deployment getDeployment() { + return Deployment.EMBEDDED; + } + + @Override + @SuppressWarnings("unchecked") + public IPartitioner getCassandraPartitioner() + throws BackendException { + try { + return StorageService.getPartitioner(); + } catch (Exception e) { + log.warn("Could not read local token range: {}", e); + throw new PermanentBackendException("Could not read partitioner information on cluster", e); + } + } + + @Override + public String toString() { + return "embeddedCassandra" + super.toString(); + } + + @Override + public void close() { + openStores.clear(); + CassandraDaemonWrapper.stop(); + } + + @Override + public synchronized KeyColumnValueStore openDatabase(String name, StoreMetaData.Container metaData) throws BackendException { + if (openStores.containsKey(name)) + return openStores.get(name); + + // Ensure that both the keyspace and column family exist + ensureKeyspaceExists(keySpaceName); + ensureColumnFamilyExists(keySpaceName, name); + + CassandraEmbeddedKeyColumnValueStore store = new CassandraEmbeddedKeyColumnValueStore(keySpaceName, name, this); + openStores.put(name, store); + return store; + } + + /* + * Raw type warnings are suppressed in this method because + * {@link StorageService#getLocalPrimaryRanges(String)} returns a raw + * (unparameterized) type. + */ + public List<KeyRange> getLocalKeyPartition() throws BackendException { + ensureKeyspaceExists(keySpaceName); + + @SuppressWarnings("rawtypes") + Collection<Range<Token>> ranges = StorageService.instance.getPrimaryRanges(keySpaceName); + + List<KeyRange> keyRanges = new ArrayList<KeyRange>(ranges.size()); + + for (@SuppressWarnings("rawtypes") Range<Token> range : ranges) { + keyRanges.add(CassandraHelper.transformRange(range)); + } + + return keyRanges; + } + + /* + * This implementation can't handle counter columns. + * + * The private method internal_batch_mutate in CassandraServer as of 1.2.0 + * provided most of the following method after transaction handling. + */ + @Override + public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException { + Preconditions.checkNotNull(mutations); + + final MaskedTimestamp commitTime = new MaskedTimestamp(txh); + + int size = 0; + for (Map<StaticBuffer, KCVMutation> mutation : mutations.values()) size += mutation.size(); + Map<StaticBuffer, org.apache.cassandra.db.Mutation> rowMutations = new HashMap<>(size); + + for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> mutEntry : mutations.entrySet()) { + String columnFamily = mutEntry.getKey(); + for (Map.Entry<StaticBuffer, KCVMutation> titanMutation : mutEntry.getValue().entrySet()) { + StaticBuffer key = titanMutation.getKey(); + KCVMutation mut = titanMutation.getValue(); + + org.apache.cassandra.db.Mutation rm = rowMutations.get(key); + if (rm == null) { + rm = new org.apache.cassandra.db.Mutation(keySpaceName, key.asByteBuffer()); + rowMutations.put(key, rm); + } + + if (mut.hasAdditions()) { + for (Entry e : mut.getAdditions()) { + Integer ttl = (Integer) e.getMetaData().get(EntryMetaData.TTL); + + if (null != ttl && ttl > 0) { + rm.add(columnFamily, CellNames.simpleDense(e.getColumnAs(StaticBuffer.BB_FACTORY)), + e.getValueAs(StaticBuffer.BB_FACTORY), commitTime.getAdditionTime(times), ttl); + } else { + rm.add(columnFamily, CellNames.simpleDense(e.getColumnAs(StaticBuffer.BB_FACTORY)), + e.getValueAs(StaticBuffer.BB_FACTORY), commitTime.getAdditionTime(times)); + } + } + } + + if (mut.hasDeletions()) { + for (StaticBuffer col : mut.getDeletions()) { + rm.delete(columnFamily, CellNames.simpleDense(col.as(StaticBuffer.BB_FACTORY)), + commitTime.getDeletionTime(times)); + } + } + + } + } + + mutate(new ArrayList<org.apache.cassandra.db.Mutation>(rowMutations.values()), getTx(txh).getWriteConsistencyLevel().getDB()); + + sleepAfterWrite(txh, commitTime); + } + + private void mutate(List<org.apache.cassandra.db.Mutation> cmds, org.apache.cassandra.db.ConsistencyLevel clvl) throws BackendException { + try { + schedule(DatabaseDescriptor.getRpcTimeout()); + try { + if (atomicBatch) { + StorageProxy.mutateAtomically(cmds, clvl); + } else { + StorageProxy.mutate(cmds, clvl); + } + } catch (RequestExecutionException e) { + throw new TemporaryBackendException(e); + } finally { + release(); + } + } catch (TimeoutException ex) { + log.debug("Cassandra TimeoutException", ex); + throw new TemporaryBackendException(ex); + } + } + + private void schedule(long timeoutMS) throws TimeoutException { + requestScheduler.queue(Thread.currentThread(), "default", DatabaseDescriptor.getRpcTimeout()); + } + + /** + * Release count for the used up resources + */ + private void release() { + requestScheduler.release(); + } + + @Override + public void clearStorage() throws BackendException { + openStores.clear(); + try { + KSMetaData ksMetaData = Schema.instance.getKSMetaData(keySpaceName); + + // Not a big deal if Keyspace doesn't not exist (dropped manually by user or tests). + // This is called on per test setup basis to make sure that previous test cleaned + // everything up, so first invocation would always fail as Keyspace doesn't yet exist. + if (ksMetaData == null) + return; + + for (String cfName : ksMetaData.cfMetaData().keySet()) + StorageService.instance.truncate(keySpaceName, cfName); + } catch (Exception e) { + throw new PermanentBackendException(e); + } + } + + private void ensureKeyspaceExists(String keyspaceName) throws BackendException { + + if (null != Schema.instance.getKeyspaceInstance(keyspaceName)) + return; + + // Keyspace not found; create it + String strategyName = storageConfig.get(REPLICATION_STRATEGY); + + KSMetaData ksm; + try { + ksm = KSMetaData.newKeyspace(keyspaceName, strategyName, strategyOptions, true); + } catch (ConfigurationException e) { + throw new PermanentBackendException("Failed to instantiate keyspace metadata for " + keyspaceName, e); + } + try { + MigrationManager.announceNewKeyspace(ksm); + log.info("Created keyspace {}", keyspaceName); + } catch (ConfigurationException e) { + throw new PermanentBackendException("Failed to create keyspace " + keyspaceName, e); + } + } + + private void ensureColumnFamilyExists(String ksName, String cfName) throws BackendException { + ensureColumnFamilyExists(ksName, cfName, BytesType.instance); + } + + private void ensureColumnFamilyExists(String keyspaceName, String columnfamilyName, AbstractType<?> comparator) throws BackendException { + if (null != Schema.instance.getCFMetaData(keyspaceName, columnfamilyName)) + return; + + // Column Family not found; create it + CFMetaData cfm = new CFMetaData(keyspaceName, columnfamilyName, ColumnFamilyType.Standard, CellNames.fromAbstractType(comparator, true)); + + // Hard-coded caching settings + if (columnfamilyName.startsWith(Backend.EDGESTORE_NAME)) { + cfm.caching(CachingOptions.KEYS_ONLY); + } else if (columnfamilyName.startsWith(Backend.INDEXSTORE_NAME)) { + cfm.caching(CachingOptions.ROWS_ONLY); + } + + // Configure sstable compression + final CompressionParameters cp; + if (compressionEnabled) { + try { + cp = new CompressionParameters(compressionClass, + compressionChunkSizeKB * 1024, + Collections.<String, String>emptyMap()); + // CompressionParameters doesn't override toString(), so be explicit + log.debug("Creating CF {}: setting {}={} and {}={} on {}", + new Object[]{ + columnfamilyName, + CompressionParameters.SSTABLE_COMPRESSION, compressionClass, + CompressionParameters.CHUNK_LENGTH_KB, compressionChunkSizeKB, + cp}); + } catch (ConfigurationException ce) { + throw new PermanentBackendException(ce); + } + } else { + cp = new CompressionParameters(null); + log.debug("Creating CF {}: setting {} to null to disable compression", + columnfamilyName, CompressionParameters.SSTABLE_COMPRESSION); + } + cfm.compressionParameters(cp); + + try { + cfm.addDefaultIndexNames(); + } catch (ConfigurationException e) { + throw new PermanentBackendException("Failed to create column family metadata for " + keyspaceName + ":" + columnfamilyName, e); + } + try { + MigrationManager.announceNewColumnFamily(cfm); + log.info("Created CF {} in KS {}", columnfamilyName, keyspaceName); + } catch (ConfigurationException e) { + throw new PermanentBackendException("Failed to create column family " + keyspaceName + ":" + columnfamilyName, e); + } + + /* + * I'm chasing a nondetermistic exception that appears only rarely on my + * machine when executing the embedded cassandra tests. If these dummy + * reads ever actually fail and dump a log message, it could help debug + * the root cause. + * + * java.lang.RuntimeException: java.lang.IllegalArgumentException: Unknown table/cf pair (InternalCassandraEmbeddedKeyColumnValueTest.testStore1) + * at org.apache.cassandra.service.StorageProxy$DroppableRunnable.run(StorageProxy.java:1582) + * at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) + * at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) + * at java.lang.Thread.run(Thread.java:744) + * Caused by: java.lang.IllegalArgumentException: Unknown table/cf pair (InternalCassandraEmbeddedKeyColumnValueTest.testStore1) + * at org.apache.cassandra.db.Table.getColumnFamilyStore(Table.java:166) + * at org.apache.cassandra.db.Table.getRow(Table.java:354) + * at org.apache.cassandra.db.SliceFromReadCommand.getRow(SliceFromReadCommand.java:70) + * at org.apache.cassandra.service.StorageProxy$LocalReadRunnable.runMayThrow(StorageProxy.java:1052) + * at org.apache.cassandra.service.StorageProxy$DroppableRunnable.run(StorageProxy.java:1578) + * ... 3 more + */ + retryDummyRead(keyspaceName, columnfamilyName); + } + + @Override + public Map<String, String> getCompressionOptions(String cf) throws BackendException { + + CFMetaData cfm = Schema.instance.getCFMetaData(keySpaceName, cf); + + if (cfm == null) + return null; + + return ImmutableMap.copyOf(cfm.compressionParameters().asThriftOptions()); + } + + private void retryDummyRead(String ks, String cf) throws PermanentBackendException { + + final long limit = System.currentTimeMillis() + (60L * 1000L); + + while (System.currentTimeMillis() < limit) { + try { + SortedSet<CellName> names = new TreeSet<>(new Comparator<CellName>() { + // This is a singleton set. We need to define a comparator because SimpleDenseCellName is not + // comparable, but it doesn't have to be a useful comparator + @Override + public int compare(CellName o1, CellName o2) + { + return 0; + } + }); + names.add(CellNames.simpleDense(ByteBufferUtil.zeroByteBuffer(1))); + NamesQueryFilter nqf = new NamesQueryFilter(names); + SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(ks, ByteBufferUtil.zeroByteBuffer(1), cf, 1L, nqf); + StorageProxy.read(ImmutableList.<ReadCommand> of(cmd), ConsistencyLevel.QUORUM); + log.info("Read on CF {} in KS {} succeeded", cf, ks); + return; + } catch (Throwable t) { + log.warn("Failed to read CF {} in KS {} following creation", cf, ks, t); + } + + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + throw new PermanentBackendException(e); + } + } + + throw new PermanentBackendException("Timed out while attempting to read CF " + cf + " in KS " + ks + " following creation"); + } +} diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java new file mode 100644 index 0000000..e44ab92 --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java @@ -0,0 +1,536 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.thinkaurelius.titan.diskstorage.*; +import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection; +import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionPool; +import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; +import com.thinkaurelius.titan.diskstorage.util.*; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.thrift.*; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.commons.lang.ArrayUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx; + +/** + * A Titan {@code KeyColumnValueStore} backed by Cassandra. + * This uses the Cassandra Thrift API. + * + * @author Dan LaRocque <dalaro@hopcount.org> + * @see CassandraThriftStoreManager + */ +public class CassandraThriftKeyColumnValueStore implements KeyColumnValueStore { + + private static final Logger logger = + LoggerFactory.getLogger(CassandraThriftKeyColumnValueStore.class); + + private static final Pattern BROKEN_BYTE_TOKEN_PATTERN = Pattern.compile("^Token\\(bytes\\[(.+)\\]\\)$"); + + // Cassandra access + private final CassandraThriftStoreManager storeManager; + private final String keyspace; + private final String columnFamily; + private final CTConnectionPool pool; + private final ThriftGetter entryGetter; + + public CassandraThriftKeyColumnValueStore(String keyspace, String columnFamily, CassandraThriftStoreManager storeManager, + CTConnectionPool pool) { + this.storeManager = storeManager; + this.keyspace = keyspace; + this.columnFamily = columnFamily; + this.pool = pool; + this.entryGetter = new ThriftGetter(storeManager.getMetaDataSchema(columnFamily)); + } + + /** + * Call Cassandra's Thrift get_slice() method. + * <p/> + * When columnEnd equals columnStart and either startInclusive + * or endInclusive is false (or both are false), then this + * method returns an empty list without making any Thrift calls. + * <p/> + * If columnEnd = columnStart + 1, and both startInclusive and + * startExclusive are false, then the arguments effectively form + * an empty interval. In this case, as in the one previous, + * an empty list is returned. However, it may not necessarily + * be handled efficiently; a Thrift call might still be made + * before returning the empty list. + * + * @throws com.thinkaurelius.titan.diskstorage.BackendException + * when columnEnd < columnStart + */ + @Override + public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException { + Map<StaticBuffer, EntryList> result = getNamesSlice(query.getKey(), query, txh); + return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST); + } + + @Override + public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException { + return getNamesSlice(keys, query, txh); + } + + public Map<StaticBuffer, EntryList> getNamesSlice(StaticBuffer key, + SliceQuery query, StoreTransaction txh) throws BackendException { + return getNamesSlice(ImmutableList.of(key),query,txh); + } + + public Map<StaticBuffer, EntryList> getNamesSlice(List<StaticBuffer> keys, + SliceQuery query, + StoreTransaction txh) throws BackendException { + ColumnParent parent = new ColumnParent(columnFamily); + /* + * Cassandra cannot handle columnStart = columnEnd. + * Cassandra's Thrift getSlice() throws InvalidRequestException + * if columnStart = columnEnd. + */ + if (query.getSliceStart().compareTo(query.getSliceEnd()) >= 0) { + // Check for invalid arguments where columnEnd < columnStart + if (query.getSliceEnd().compareTo(query.getSliceStart())<0) { + throw new PermanentBackendException("columnStart=" + query.getSliceStart() + + " is greater than columnEnd=" + query.getSliceEnd() + ". " + + "columnStart must be less than or equal to columnEnd"); + } + if (0 != query.getSliceStart().length() && 0 != query.getSliceEnd().length()) { + logger.debug("Return empty list due to columnEnd==columnStart and neither empty"); + return KCVSUtil.emptyResults(keys); + } + } + + assert query.getSliceStart().compareTo(query.getSliceEnd()) < 0; + ConsistencyLevel consistency = getTx(txh).getReadConsistencyLevel().getThrift(); + SlicePredicate predicate = new SlicePredicate(); + SliceRange range = new SliceRange(); + range.setCount(query.getLimit() + (query.hasLimit()?1:0)); //Add one for potentially removed last column + range.setStart(query.getSliceStart().asByteBuffer()); + range.setFinish(query.getSliceEnd().asByteBuffer()); + predicate.setSlice_range(range); + + CTConnection conn = null; + try { + conn = pool.borrowObject(keyspace); + Cassandra.Client client = conn.getClient(); + Map<ByteBuffer, List<ColumnOrSuperColumn>> rows = client.multiget_slice(CassandraHelper.convert(keys), + parent, + predicate, + consistency); + + /* + * The final size of the "result" List may be at most rows.size(). + * However, "result" could also be up to two elements smaller than + * rows.size(), depending on startInclusive and endInclusive + */ + Map<StaticBuffer, EntryList> results = new HashMap<StaticBuffer, EntryList>(); + + for (ByteBuffer key : rows.keySet()) { + results.put(StaticArrayBuffer.of(key), + CassandraHelper.makeEntryList(rows.get(key), entryGetter, query.getSliceEnd(), query.getLimit())); + } + + return results; + } catch (Exception e) { + throw convertException(e); + } finally { + pool.returnObjectUnsafe(keyspace, conn); + } + } + + private static class ThriftGetter implements StaticArrayEntry.GetColVal<ColumnOrSuperColumn,ByteBuffer> { + + private final EntryMetaData[] schema; + + private ThriftGetter(EntryMetaData[] schema) { + this.schema = schema; + } + + @Override + public ByteBuffer getColumn(ColumnOrSuperColumn element) { + return element.getColumn().bufferForName(); + } + + @Override + public ByteBuffer getValue(ColumnOrSuperColumn element) { + return element.getColumn().bufferForValue(); + } + + @Override + public EntryMetaData[] getMetaSchema(ColumnOrSuperColumn element) { + return schema; + } + + @Override + public Object getMetaData(ColumnOrSuperColumn element, EntryMetaData meta) { + switch(meta) { + case TIMESTAMP: + return element.getColumn().getTimestamp(); + case TTL: + return element.getColumn().getTtl(); + default: + throw new UnsupportedOperationException("Unsupported meta data: " + meta); + } + } + } + + @Override + public void close() { + // Do nothing + } + + @Override + public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, + StoreTransaction txh) throws BackendException { + throw new UnsupportedOperationException(); + } + + @Override + public KeyIterator getKeys(@Nullable SliceQuery sliceQuery, StoreTransaction txh) throws BackendException { + final IPartitioner partitioner = storeManager.getCassandraPartitioner(); + + if (!(partitioner instanceof RandomPartitioner) && !(partitioner instanceof Murmur3Partitioner)) + throw new PermanentBackendException("This operation is only allowed when random partitioner (md5 or murmur3) is used."); + + try { + return new AllTokensIterator(partitioner, sliceQuery, storeManager.getPageSize()); + } catch (Exception e) { + throw convertException(e); + } + } + + @Override + public KeyIterator getKeys(KeyRangeQuery keyRangeQuery, StoreTransaction txh) throws BackendException { + final IPartitioner partitioner = storeManager.getCassandraPartitioner(); + + // see rant about the reason of this limitation in Astyanax implementation of this method. + if (!(partitioner instanceof AbstractByteOrderedPartitioner)) + throw new PermanentBackendException("This operation is only allowed when byte-ordered partitioner is used."); + + try { + return new KeyRangeIterator(partitioner, keyRangeQuery, storeManager.getPageSize(), + keyRangeQuery.getKeyStart().asByteBuffer(), + keyRangeQuery.getKeyEnd().asByteBuffer()); + } catch (Exception e) { + throw convertException(e); + } + } + + @Override + public String getName() { + return columnFamily; + } + + @Override + public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException { + Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions)); + mutateMany(mutations, txh); + } + + public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException { + storeManager.mutateMany(ImmutableMap.of(columnFamily, mutations), txh); + } + + static BackendException convertException(Throwable e) { + if (e instanceof TException) { + return new PermanentBackendException(e); + } else if (e instanceof TimedOutException) { + return new TemporaryBackendException(e); + } else if (e instanceof UnavailableException) { + return new TemporaryBackendException(e); + } else if (e instanceof InvalidRequestException) { + return new PermanentBackendException(e); + } else { + return new PermanentBackendException(e); + } + } + + @Override + public String toString() { + return "CassandraThriftKeyColumnValueStore[ks=" + + keyspace + ", cf=" + columnFamily + "]"; + } + + + private List<KeySlice> getKeySlice(ByteBuffer startKey, + ByteBuffer endKey, + SliceQuery columnSlice, + int count) throws BackendException { + return getRangeSlices(new org.apache.cassandra.thrift.KeyRange().setStart_key(startKey).setEnd_key(endKey).setCount(count), columnSlice); + } + + private <T extends Token> List<KeySlice> getTokenSlice(T startToken, T endToken, + SliceQuery sliceQuery, int count) throws BackendException { + + String st = sanitizeBrokenByteToken(startToken); + String et = sanitizeBrokenByteToken(endToken); + + org.apache.cassandra.thrift.KeyRange kr = new org.apache.cassandra.thrift.KeyRange().setStart_token(st).setEnd_token(et).setCount(count); + + return getRangeSlices(kr, sliceQuery); + } + + private String sanitizeBrokenByteToken(Token tok) { + /* + * Background: https://issues.apache.org/jira/browse/CASSANDRA-5566 + * + * This check is useful for compatibility with Cassandra server versions + * 1.2.4 and earlier. + */ + String st = tok.toString(); + if (!(tok instanceof BytesToken)) + return st; + + // Do a cheap 1-character startsWith before unleashing the regex + if (st.startsWith("T")) { + Matcher m = BROKEN_BYTE_TOKEN_PATTERN.matcher(st); + if (!m.matches()) { + logger.warn("Unknown token string format: \"{}\"", st); + } else { + String old = st; + st = m.group(1); + logger.debug("Rewrote token string: \"{}\" -> \"{}\"", old, st); + } + } + return st; + } + + private List<KeySlice> getRangeSlices(org.apache.cassandra.thrift.KeyRange keyRange, @Nullable SliceQuery sliceQuery) throws BackendException { + SliceRange sliceRange = new SliceRange(); + + if (sliceQuery == null) { + sliceRange.setStart(ArrayUtils.EMPTY_BYTE_ARRAY) + .setFinish(ArrayUtils.EMPTY_BYTE_ARRAY) + .setCount(5); + } else { + sliceRange.setStart(sliceQuery.getSliceStart().asByteBuffer()) + .setFinish(sliceQuery.getSliceEnd().asByteBuffer()) + .setCount((sliceQuery.hasLimit()) ? sliceQuery.getLimit() : Integer.MAX_VALUE); + } + + + CTConnection connection = null; + try { + connection = pool.borrowObject(keyspace); + + List<KeySlice> slices = + connection.getClient().get_range_slices(new ColumnParent(columnFamily), + new SlicePredicate() + .setSlice_range(sliceRange), + keyRange, + ConsistencyLevel.QUORUM); + + for (KeySlice s : slices) { + logger.debug("Key {}", ByteBufferUtil.toString(s.key, "-")); + } + + /* Note: we need to fetch columns for each row as well to remove "range ghosts" */ + List<KeySlice> result = new ArrayList<>(slices.size()); + KeyIterationPredicate pred = new KeyIterationPredicate(); + for (KeySlice ks : slices) + if (pred.apply(ks)) + result.add(ks); + return result; + } catch (Exception e) { + throw convertException(e); + } finally { + if (connection != null) + pool.returnObjectUnsafe(keyspace, connection); + } + } + + private static class KeyIterationPredicate implements Predicate<KeySlice> { + + @Override + public boolean apply(@Nullable KeySlice row) { + return (row != null) && row.getColumns().size() > 0; + } + } + + /** + * Slices rows and columns using tokens. Recall that the partitioner turns + * keys into tokens. For instance, under RandomPartitioner, tokens are the + * MD5 hashes of keys. + */ + public class AbstractBufferedRowIter implements KeyIterator { + + private final int pageSize; + private final SliceQuery columnSlice; + + private boolean isClosed; + private boolean seenEnd; + protected Iterator<KeySlice> ksIter; + private KeySlice mostRecentRow; + + private final IPartitioner partitioner; + private Token nextStartToken; + private final Token endToken; + private ByteBuffer nextStartKey; + + private boolean omitEndToken; + + public AbstractBufferedRowIter(IPartitioner partitioner, + SliceQuery columnSlice, int pageSize, Token startToken, Token endToken, boolean omitEndToken) { + this.pageSize = pageSize; + this.partitioner = partitioner; + this.nextStartToken = startToken; + this.endToken = endToken; + this.columnSlice = columnSlice; + + this.seenEnd = false; + this.isClosed = false; + this.ksIter = Iterators.emptyIterator(); + this.mostRecentRow = null; + this.omitEndToken = omitEndToken; + } + + @Override + public boolean hasNext() { + ensureOpen(); + + if (!ksIter.hasNext() && !seenEnd) { + try { + ksIter = rebuffer().iterator(); + } catch (BackendException e) { + throw new RuntimeException(e); + } + } + + return ksIter.hasNext(); + } + + @Override + public StaticBuffer next() { + ensureOpen(); + + if (!hasNext()) + throw new NoSuchElementException(); + + mostRecentRow = ksIter.next(); + + Preconditions.checkNotNull(mostRecentRow); + return StaticArrayBuffer.of(mostRecentRow.bufferForKey()); + } + + @Override + public void close() { + closeIterator(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public RecordIterator<Entry> getEntries() { + ensureOpen(); + + return new RecordIterator<Entry>() { + final Iterator<Entry> columns = + CassandraHelper.makeEntryIterator(mostRecentRow.getColumns(), + entryGetter, columnSlice.getSliceEnd(), + columnSlice.getLimit()); + + @Override + public boolean hasNext() { + ensureOpen(); + return columns.hasNext(); + } + + @Override + public Entry next() { + ensureOpen(); + return columns.next(); + } + + @Override + public void close() { + closeIterator(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + private void ensureOpen() { + if (isClosed) + throw new IllegalStateException("Iterator has been closed."); + } + + private void closeIterator() { + if (!isClosed) { + isClosed = true; + } + } + + private List<KeySlice> rebuffer() throws BackendException { + + Preconditions.checkArgument(!seenEnd); + + return checkFreshSlices(getNextKeySlices()); + } + + protected List<KeySlice> checkFreshSlices(List<KeySlice> ks) { + + if (0 == ks.size()) { + seenEnd = true; + return Collections.emptyList(); + } + + nextStartKey = ks.get(ks.size() - 1).bufferForKey(); + nextStartToken = partitioner.getToken(nextStartKey); + + if (nextStartToken.equals(endToken)) { + seenEnd = true; + if (omitEndToken) + ks.remove(ks.size() - 1); + } + + return ks; + } + + protected final List<KeySlice> getNextKeySlices() throws BackendException { + return getTokenSlice(nextStartToken, endToken, columnSlice, pageSize); + } + } + + private final class AllTokensIterator extends AbstractBufferedRowIter { + public AllTokensIterator(IPartitioner partitioner, SliceQuery columnSlice, int pageSize) { + super(partitioner, columnSlice, pageSize, partitioner.getMinimumToken(), partitioner.getMinimumToken(), false); + } + } + + private final class KeyRangeIterator extends AbstractBufferedRowIter { + public KeyRangeIterator(IPartitioner partitioner, SliceQuery columnSlice, + int pageSize, ByteBuffer startKey, ByteBuffer endKey) throws BackendException { + super(partitioner, columnSlice, pageSize, partitioner.getToken(startKey), partitioner.getToken(endKey), true); + + Preconditions.checkArgument(partitioner instanceof AbstractByteOrderedPartitioner); + + // Get first slice with key range instead of token range. Token + // ranges are start-exclusive, key ranges are start-inclusive. Both + // are end-inclusive. If we don't make the call below, then we will + // erroneously miss startKey. + List<KeySlice> ks = getKeySlice(startKey, endKey, columnSlice, pageSize); + + this.ksIter = checkFreshSlices(ks).iterator(); + } + } +} diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java new file mode 100644 index 0000000..d904860 --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftStoreManager.java @@ -0,0 +1,621 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift; + +import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.thinkaurelius.titan.diskstorage.EntryMetaData; +import com.thinkaurelius.titan.diskstorage.*; +import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange; +import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; +import com.thinkaurelius.titan.util.system.NetworkUtil; + +import org.apache.cassandra.dht.AbstractByteOrderedPartitioner; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager; +import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection; +import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory; +import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionPool; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; + +import static com.thinkaurelius.titan.diskstorage.configuration.ConfigOption.disallowEmpty; + +/** + * This class creates {@see CassandraThriftKeyColumnValueStore}s and + * handles Cassandra-backed allocation of vertex IDs for Titan (when so + * configured). + * + * @author Dan LaRocque <dalaro@hopcount.org> + */ +@PreInitializeConfigOptions +public class CassandraThriftStoreManager extends AbstractCassandraStoreManager { + + public enum PoolExhaustedAction { + BLOCK(GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK), + FAIL(GenericKeyedObjectPool.WHEN_EXHAUSTED_FAIL), + GROW(GenericKeyedObjectPool.WHEN_EXHAUSTED_GROW); + + private final byte b; + + PoolExhaustedAction(byte b) { + this.b = b; + } + + public byte getByte() { + return b; + } + } + + private static final Logger log = LoggerFactory.getLogger(CassandraThriftStoreManager.class); + + public static final ConfigNamespace THRIFT_NS = + new ConfigNamespace(AbstractCassandraStoreManager.CASSANDRA_NS, "thrift", + "Options for Titan's own Thrift Cassandra backend"); + + public static final ConfigNamespace CPOOL_NS = + new ConfigNamespace(THRIFT_NS, "cpool", "Options for the Apache commons-pool connection manager"); + + public static final ConfigOption<String> CPOOL_WHEN_EXHAUSTED = + new ConfigOption<>(CPOOL_NS, "when-exhausted", + "What to do when clients concurrently request more active connections than are allowed " + + "by the pool. The value must be one of BLOCK, FAIL, or GROW.", + ConfigOption.Type.MASKABLE, String.class, PoolExhaustedAction.BLOCK.toString(), + disallowEmpty(String.class)); + + public static final ConfigOption<Integer> CPOOL_MAX_TOTAL = + new ConfigOption<Integer>(CPOOL_NS, "max-total", + "Max number of allowed Thrift connections, idle or active (-1 to leave undefined)", + ConfigOption.Type.MASKABLE, -1); + + public static final ConfigOption<Integer> CPOOL_MAX_ACTIVE = + new ConfigOption<Integer>(CPOOL_NS, "max-active", + "Maximum number of concurrently in-use connections (-1 to leave undefined)", + ConfigOption.Type.MASKABLE, 16); + + public static final ConfigOption<Integer> CPOOL_MAX_IDLE = + new ConfigOption<Integer>(CPOOL_NS, "max-idle", + "Maximum number of concurrently idle connections (-1 to leave undefined)", + ConfigOption.Type.MASKABLE, 4); + + public static final ConfigOption<Integer> CPOOL_MIN_IDLE = + new ConfigOption<Integer>(CPOOL_NS, "min-idle", + "Minimum number of idle connections the pool attempts to maintain", + ConfigOption.Type.MASKABLE, 0); + + // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration + public static final ConfigOption<Long> CPOOL_MAX_WAIT = + new ConfigOption<Long>(CPOOL_NS, "max-wait", + "Maximum number of milliseconds to block when " + ConfigElement.getPath(CPOOL_WHEN_EXHAUSTED) + + " is set to BLOCK. Has no effect when set to actions besides BLOCK. Set to -1 to wait indefinitely.", + ConfigOption.Type.MASKABLE, -1L); + + // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration + public static final ConfigOption<Long> CPOOL_EVICTOR_PERIOD = + new ConfigOption<Long>(CPOOL_NS, "evictor-period", + "Approximate number of milliseconds between runs of the idle connection evictor. " + + "Set to -1 to never run the idle connection evictor.", + ConfigOption.Type.MASKABLE, 30L * 1000L); + + // Wart: allowing -1 like commons-pool's convention precludes using StandardDuration + public static final ConfigOption<Long> CPOOL_MIN_EVICTABLE_IDLE_TIME = + new ConfigOption<Long>(CPOOL_NS, "min-evictable-idle-time", + "Minimum number of milliseconds a connection must be idle before it is eligible for " + + "eviction. See also " + ConfigElement.getPath(CPOOL_EVICTOR_PERIOD) + ". Set to -1 to never evict " + + "idle connections.", ConfigOption.Type.MASKABLE, 60L * 1000L); + + public static final ConfigOption<Boolean> CPOOL_IDLE_TESTS = + new ConfigOption<Boolean>(CPOOL_NS, "idle-test", + "Whether the idle connection evictor validates idle connections and drops those that fail to validate", + ConfigOption.Type.MASKABLE, false); + + public static final ConfigOption<Integer> CPOOL_IDLE_TESTS_PER_EVICTION_RUN = + new ConfigOption<Integer>(CPOOL_NS, "idle-tests-per-eviction-run", + "When the value is negative, e.g. -n, roughly one nth of the idle connections are tested per run. " + + "When the value is positive, e.g. n, the min(idle-count, n) connections are tested per run.", + ConfigOption.Type.MASKABLE, 0); + + + private final Map<String, CassandraThriftKeyColumnValueStore> openStores; + private final CTConnectionPool pool; + private final Deployment deployment; + + public CassandraThriftStoreManager(Configuration config) throws BackendException { + super(config); + + /* + * This is eventually passed to Thrift's TSocket constructor. The + * constructor parameter is of type int. + */ + int thriftTimeoutMS = (int)config.get(GraphDatabaseConfiguration.CONNECTION_TIMEOUT).toMillis(); + + CTConnectionFactory.Config factoryConfig = new CTConnectionFactory.Config(hostnames, port, username, password) + .setTimeoutMS(thriftTimeoutMS) + .setFrameSize(thriftFrameSizeBytes); + + if (config.get(SSL_ENABLED)) { + factoryConfig.setSSLTruststoreLocation(config.get(SSL_TRUSTSTORE_LOCATION)); + factoryConfig.setSSLTruststorePassword(config.get(SSL_TRUSTSTORE_PASSWORD)); + } + + final PoolExhaustedAction poolExhaustedAction = ConfigOption.getEnumValue( + config.get(CPOOL_WHEN_EXHAUSTED), PoolExhaustedAction.class); + + CTConnectionPool p = new CTConnectionPool(factoryConfig.build()); + p.setTestOnBorrow(true); + p.setTestOnReturn(true); + p.setTestWhileIdle(config.get(CPOOL_IDLE_TESTS)); + p.setNumTestsPerEvictionRun(config.get(CPOOL_IDLE_TESTS_PER_EVICTION_RUN)); + p.setWhenExhaustedAction(poolExhaustedAction.getByte()); + p.setMaxActive(config.get(CPOOL_MAX_ACTIVE)); + p.setMaxTotal(config.get(CPOOL_MAX_TOTAL)); // maxTotal limits active + idle + p.setMaxIdle(config.get(CPOOL_MAX_IDLE)); + p.setMinIdle(config.get(CPOOL_MIN_IDLE)); + p.setMaxWait(config.get(CPOOL_MAX_WAIT)); + p.setTimeBetweenEvictionRunsMillis(config.get(CPOOL_EVICTOR_PERIOD)); + p.setMinEvictableIdleTimeMillis(config.get(CPOOL_MIN_EVICTABLE_IDLE_TIME)); + this.pool = p; + + this.openStores = new HashMap<String, CassandraThriftKeyColumnValueStore>(); + + // Only watch the ring and change endpoints with BOP + if (getCassandraPartitioner() instanceof ByteOrderedPartitioner) { + deployment = (hostnames.length == 1)// mark deployment as local only in case we have byte ordered partitioner and local connection + ? (NetworkUtil.isLocalConnection(hostnames[0])) ? Deployment.LOCAL : Deployment.REMOTE + : Deployment.REMOTE; + } else { + deployment = Deployment.REMOTE; + } + } + + @Override + public Deployment getDeployment() { + return deployment; + } + + @Override + @SuppressWarnings("unchecked") + public IPartitioner getCassandraPartitioner() throws BackendException { + CTConnection conn = null; + try { + conn = pool.borrowObject(SYSTEM_KS); + return FBUtilities.newPartitioner(conn.getClient().describe_partitioner()); + } catch (Exception e) { + throw new TemporaryBackendException(e); + } finally { + pool.returnObjectUnsafe(SYSTEM_KS, conn); + } + } + + @Override + public String toString() { + return "thriftCassandra" + super.toString(); + } + + @Override + public void close() throws BackendException { + openStores.clear(); + closePool(); + } + + @Override + public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException { + Preconditions.checkNotNull(mutations); + + final MaskedTimestamp commitTime = new MaskedTimestamp(txh); + + ConsistencyLevel consistency = getTx(txh).getWriteConsistencyLevel().getThrift(); + + // Generate Thrift-compatible batch_mutate() datastructure + // key -> cf -> cassmutation + int size = 0; + for (Map<StaticBuffer, KCVMutation> mutation : mutations.values()) size += mutation.size(); + Map<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>> batch = + new HashMap<ByteBuffer, Map<String, List<org.apache.cassandra.thrift.Mutation>>>(size); + + + for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> keyMutation : mutations.entrySet()) { + String columnFamily = keyMutation.getKey(); + for (Map.Entry<StaticBuffer, KCVMutation> mutEntry : keyMutation.getValue().entrySet()) { + ByteBuffer keyBB = mutEntry.getKey().asByteBuffer(); + + // Get or create the single Cassandra Mutation object responsible for this key + Map<String, List<org.apache.cassandra.thrift.Mutation>> cfmutation = batch.get(keyBB); + if (cfmutation == null) { + cfmutation = new HashMap<String, List<org.apache.cassandra.thrift.Mutation>>(3); // Most mutations only modify the edgeStore and indexStore + batch.put(keyBB, cfmutation); + } + + KCVMutation mutation = mutEntry.getValue(); + List<org.apache.cassandra.thrift.Mutation> thriftMutation = + new ArrayList<org.apache.cassandra.thrift.Mutation>(mutations.size()); + + if (mutation.hasDeletions()) { + for (StaticBuffer buf : mutation.getDeletions()) { + Deletion d = new Deletion(); + SlicePredicate sp = new SlicePredicate(); + sp.addToColumn_names(buf.as(StaticBuffer.BB_FACTORY)); + d.setPredicate(sp); + d.setTimestamp(commitTime.getDeletionTime(times)); + org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation(); + m.setDeletion(d); + thriftMutation.add(m); + } + } + + if (mutation.hasAdditions()) { + for (Entry ent : mutation.getAdditions()) { + ColumnOrSuperColumn cosc = new ColumnOrSuperColumn(); + Column column = new Column(ent.getColumnAs(StaticBuffer.BB_FACTORY)); + column.setValue(ent.getValueAs(StaticBuffer.BB_FACTORY)); + + column.setTimestamp(commitTime.getAdditionTime(times)); + + Integer ttl = (Integer) ent.getMetaData().get(EntryMetaData.TTL); + if (null != ttl && ttl > 0) { + column.setTtl(ttl); + } + + cosc.setColumn(column); + org.apache.cassandra.thrift.Mutation m = new org.apache.cassandra.thrift.Mutation(); + m.setColumn_or_supercolumn(cosc); + thriftMutation.add(m); + } + } + + cfmutation.put(columnFamily, thriftMutation); + } + } + + CTConnection conn = null; + try { + conn = pool.borrowObject(keySpaceName); + Cassandra.Client client = conn.getClient(); + if (atomicBatch) { + client.atomic_batch_mutate(batch, consistency); + } else { + client.batch_mutate(batch, consistency); + } + } catch (Exception ex) { + throw CassandraThriftKeyColumnValueStore.convertException(ex); + } finally { + pool.returnObjectUnsafe(keySpaceName, conn); + } + + sleepAfterWrite(txh, commitTime); + } + + @Override // TODO: *BIG FAT WARNING* 'synchronized is always *bad*, change openStores to use ConcurrentLinkedHashMap + public synchronized CassandraThriftKeyColumnValueStore openDatabase(final String name, StoreMetaData.Container metaData) throws BackendException { + if (openStores.containsKey(name)) + return openStores.get(name); + + ensureColumnFamilyExists(keySpaceName, name); + + CassandraThriftKeyColumnValueStore store = new CassandraThriftKeyColumnValueStore(keySpaceName, name, this, pool); + openStores.put(name, store); + return store; + } + + @Override + public List<KeyRange> getLocalKeyPartition() throws BackendException { + CTConnection conn = null; + IPartitioner partitioner = getCassandraPartitioner(); + + if (!(partitioner instanceof AbstractByteOrderedPartitioner)) + throw new UnsupportedOperationException("getLocalKeyPartition() only supported by byte ordered partitioner."); + + Token.TokenFactory tokenFactory = partitioner.getTokenFactory(); + + try { + // Resist the temptation to describe SYSTEM_KS. It has no ring. + // Instead, we'll create our own keyspace (or check that it exists), then describe it. + ensureKeyspaceExists(keySpaceName); + + conn = pool.borrowObject(keySpaceName); + List<TokenRange> ranges = conn.getClient().describe_ring(keySpaceName); + List<KeyRange> keyRanges = new ArrayList<KeyRange>(ranges.size()); + + for (TokenRange range : ranges) { + if (!NetworkUtil.hasLocalAddress(range.endpoints)) + continue; + + keyRanges.add(CassandraHelper.transformRange(tokenFactory.fromString(range.start_token), tokenFactory.fromString(range.end_token))); + } + + return keyRanges; + } catch (Exception e) { + throw CassandraThriftKeyColumnValueStore.convertException(e); + } finally { + pool.returnObjectUnsafe(keySpaceName, conn); + } + } + + /** + * Connect to Cassandra via Thrift on the specified host and port and attempt to truncate the named keyspace. + * <p/> + * This is a utility method intended mainly for testing. It is + * equivalent to issuing 'truncate <cf>' for each of the column families in keyspace using + * the cassandra-cli tool. + * <p/> + * Using truncate is better for a number of reasons, most significantly because it doesn't + * involve any schema modifications which can take time to propagate across the cluster such + * leaves nodes in the inconsistent state and could result in read/write failures. + * Any schema modifications are discouraged until there is no traffic to Keyspace or ColumnFamilies. + * + * @throws com.thinkaurelius.titan.diskstorage.BackendException if any checked Thrift or UnknownHostException is thrown in the body of this method + */ + public void clearStorage() throws BackendException { + openStores.clear(); + final String lp = "ClearStorage: "; // "log prefix" + /* + * log4j is capable of automatically writing the name of a method that + * generated a log message, but the docs warn that "generating caller + * location information is extremely slow and should be avoided unless + * execution speed is not an issue." + */ + + CTConnection conn = null; + try { + conn = pool.borrowObject(SYSTEM_KS); + Cassandra.Client client = conn.getClient(); + + KsDef ksDef; + try { + client.set_keyspace(keySpaceName); + ksDef = client.describe_keyspace(keySpaceName); + } catch (NotFoundException e) { + log.debug(lp + "Keyspace {} does not exist, not attempting to truncate.", keySpaceName); + return; + } catch (InvalidRequestException e) { + log.debug(lp + "InvalidRequestException when attempting to describe keyspace {}, not attempting to truncate.", keySpaceName); + return; + } + + + if (null == ksDef) { + log.debug(lp + "Received null KsDef for keyspace {}; not truncating its CFs", keySpaceName); + return; + } + + List<CfDef> cfDefs = ksDef.getCf_defs(); + + if (null == cfDefs) { + log.debug(lp + "Received empty CfDef list for keyspace {}; not truncating CFs", keySpaceName); + return; + } + + for (CfDef cfDef : ksDef.getCf_defs()) { + client.truncate(cfDef.name); + log.info(lp + "Truncated CF {} in keyspace {}", cfDef.name, keySpaceName); + } + + /* + * Clearing the CTConnectionPool is unnecessary. This method + * removes no keyspaces. All open Cassandra connections will + * remain valid. + */ + } catch (Exception e) { + throw new TemporaryBackendException(e); + } finally { + if (conn != null && conn.getClient() != null) { + try { + conn.getClient().set_keyspace(SYSTEM_KS); + } catch (InvalidRequestException e) { + log.warn("Failed to reset keyspace", e); + } catch (TException e) { + log.warn("Failed to reset keyspace", e); + } + } + pool.returnObjectUnsafe(SYSTEM_KS, conn); + } + } + + private KsDef ensureKeyspaceExists(String keyspaceName) throws TException, BackendException { + CTConnection connection = null; + + try { + connection = pool.borrowObject(SYSTEM_KS); + Cassandra.Client client = connection.getClient(); + + try { + // Side effect: throws Exception if keyspaceName doesn't exist + client.set_keyspace(keyspaceName); // Don't remove + client.set_keyspace(SYSTEM_KS); + log.debug("Found existing keyspace {}", keyspaceName); + } catch (InvalidRequestException e) { + // Keyspace didn't exist; create it + log.debug("Creating keyspace {}...", keyspaceName); + + KsDef ksdef = new KsDef().setName(keyspaceName) + .setCf_defs(new LinkedList<CfDef>()) // cannot be null but can be empty + .setStrategy_class(storageConfig.get(REPLICATION_STRATEGY)) + .setStrategy_options(strategyOptions); + + client.set_keyspace(SYSTEM_KS); + try { + client.system_add_keyspace(ksdef); + retrySetKeyspace(keyspaceName, client); + log.debug("Created keyspace {}", keyspaceName); + } catch (InvalidRequestException ire) { + log.error("system_add_keyspace failed for keyspace=" + keyspaceName, ire); + throw ire; + } + + } + + return client.describe_keyspace(keyspaceName); + } catch (Exception e) { + throw new TemporaryBackendException(e); + } finally { + pool.returnObjectUnsafe(SYSTEM_KS, connection); + } + } + + private void retrySetKeyspace(String ksName, Cassandra.Client client) throws BackendException { + final long end = System.currentTimeMillis() + (60L * 1000L); + + while (System.currentTimeMillis() <= end) { + try { + client.set_keyspace(ksName); + return; + } catch (Exception e) { + log.warn("Exception when changing to keyspace {} after creating it", ksName, e); + try { + Thread.sleep(1000L); + } catch (InterruptedException ie) { + throw new PermanentBackendException("Unexpected interrupt (shutting down?)", ie); + } + } + } + + throw new PermanentBackendException("Could change to keyspace " + ksName + " after creating it"); + } + + private void ensureColumnFamilyExists(String ksName, String cfName) throws BackendException { + ensureColumnFamilyExists(ksName, cfName, "org.apache.cassandra.db.marshal.BytesType"); + } + + private void ensureColumnFamilyExists(String ksName, String cfName, String comparator) throws BackendException { + CTConnection conn = null; + try { + KsDef keyspaceDef = ensureKeyspaceExists(ksName); + + conn = pool.borrowObject(ksName); + Cassandra.Client client = conn.getClient(); + + log.debug("Looking up metadata on keyspace {}...", ksName); + + boolean foundColumnFamily = false; + for (CfDef cfDef : keyspaceDef.getCf_defs()) { + String curCfName = cfDef.getName(); + if (curCfName.equals(cfName)) + foundColumnFamily = true; + } + + if (!foundColumnFamily) { + createColumnFamily(client, ksName, cfName, comparator); + } else { + log.debug("Keyspace {} and ColumnFamily {} were found.", ksName, cfName); + } + } catch (SchemaDisagreementException e) { + throw new TemporaryBackendException(e); + } catch (Exception e) { + throw new PermanentBackendException(e); + } finally { + pool.returnObjectUnsafe(ksName, conn); + } + } + + private void createColumnFamily(Cassandra.Client client, + String ksName, + String cfName, + String comparator) throws BackendException { + + CfDef createColumnFamily = new CfDef(); + createColumnFamily.setName(cfName); + createColumnFamily.setKeyspace(ksName); + createColumnFamily.setComparator_type(comparator); + + ImmutableMap.Builder<String, String> compressionOptions = new ImmutableMap.Builder<String, String>(); + + if (compressionEnabled) { + compressionOptions.put("sstable_compression", compressionClass) + .put("chunk_length_kb", Integer.toString(compressionChunkSizeKB)); + } + + createColumnFamily.setCompression_options(compressionOptions.build()); + + // Hard-coded caching settings + if (cfName.startsWith(Backend.EDGESTORE_NAME)) { + createColumnFamily.setCaching("keys_only"); + } else if (cfName.startsWith(Backend.INDEXSTORE_NAME)) { + createColumnFamily.setCaching("rows_only"); + } + + log.debug("Adding column family {} to keyspace {}...", cfName, ksName); + try { + client.system_add_column_family(createColumnFamily); + } catch (SchemaDisagreementException e) { + throw new TemporaryBackendException("Error in setting up column family", e); + } catch (Exception e) { + throw new PermanentBackendException(e); + } + + log.debug("Added column family {} to keyspace {}.", cfName, ksName); + } + + @Override + public Map<String, String> getCompressionOptions(String cf) throws BackendException { + CTConnection conn = null; + Map<String, String> result = null; + + try { + conn = pool.borrowObject(keySpaceName); + Cassandra.Client client = conn.getClient(); + + KsDef ksDef = client.describe_keyspace(keySpaceName); + + for (CfDef cfDef : ksDef.getCf_defs()) { + if (null != cfDef && cfDef.getName().equals(cf)) { + result = cfDef.getCompression_options(); + break; + } + } + + return result; + } catch (InvalidRequestException e) { + log.debug("Keyspace {} does not exist", keySpaceName); + return null; + } catch (Exception e) { + throw new TemporaryBackendException(e); + } finally { + pool.returnObjectUnsafe(keySpaceName, conn); + } + } + + private void closePool() { + /* + * pool.close() does not affect borrowed connections. + * + * Connections currently borrowed by some thread which are + * talking to the old host will eventually be destroyed by + * CTConnectionFactory#validateObject() returning false when + * those connections are returned to the pool. + */ + try { + pool.close(); + log.info("Closed Thrift connection pooler."); + } catch (Exception e) { + log.warn("Failed to close connection pooler. " + + "We might be leaking Cassandra connections.", e); + // There's still hope: CTConnectionFactory#validateObject() + // will be called on borrow() and might tear down the + // connections that close() failed to tear down + } + } +} diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnection.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnection.java new file mode 100644 index 0000000..3657fb9 --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnection.java @@ -0,0 +1,55 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool; + +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.Cassandra.Client; +import org.apache.thrift.transport.TTransport; + +import java.io.Closeable; + +/** + * Wraps a {@code Cassandra.Client} instance, its underlying {@code TTransport} + * instance, and the {@link com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory.Config} instance used to setup + * the connection. + * + * @see CTConnectionFactory + * @see CTConnectionPool + * @author Dan LaRocque <dalaro@hopcount.org> + */ +public class CTConnection implements Closeable { + + private final TTransport transport; + private final Cassandra.Client client; + private final CTConnectionFactory.Config cfg; + + public CTConnection(TTransport transport, Client client, CTConnectionFactory.Config cfg) { + this.transport = transport; + this.client = client; + this.cfg = cfg; + } + + public TTransport getTransport() { + return transport; + } + + public Cassandra.Client getClient() { + return client; + } + + public CTConnectionFactory.Config getConfig() { + return cfg; + } + + public boolean isOpen() { + return transport.isOpen(); + } + @Override + public void close() { + if (transport != null && transport.isOpen()) + transport.close(); + } + + @Override + public String toString() { + return "CTConnection [transport=" + transport + ", client=" + client + ", cfg=" + cfg + "]"; + } +}
\ No newline at end of file diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java new file mode 100644 index 0000000..1c60cfd --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionFactory.java @@ -0,0 +1,210 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool; + +import org.apache.cassandra.auth.IAuthenticator; +import org.apache.cassandra.thrift.*; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.pool.KeyedPoolableObjectFactory; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A factory compatible with Apache commons-pool for Cassandra Thrift API + * connections. + * + * @author Dan LaRocque <dalaro@hopcount.org> + */ +public class CTConnectionFactory implements KeyedPoolableObjectFactory<String, CTConnection> { + + private static final Logger log = LoggerFactory.getLogger(CTConnectionFactory.class); + private static final long SCHEMA_WAIT_MAX = 5000L; + private static final long SCHEMA_WAIT_INCREMENT = 25L; + + private final AtomicReference<Config> cfgRef; + + private CTConnectionFactory(Config config) { + this.cfgRef = new AtomicReference<Config>(config); + } + + @Override + public void activateObject(String key, CTConnection c) throws Exception { + // Do nothing, as in passivateObject + } + + @Override + public void destroyObject(String key, CTConnection c) throws Exception { + TTransport t = c.getTransport(); + + if (t.isOpen()) { + t.close(); + log.trace("Closed transport {}", t); + } else { + log.trace("Not closing transport {} (already closed)", t); + } + } + + @Override + public CTConnection makeObject(String key) throws Exception { + CTConnection conn = makeRawConnection(); + Cassandra.Client client = conn.getClient(); + client.set_keyspace(key); + + return conn; + } + + /** + * Create a Cassandra-Thrift connection, but do not attempt to + * set a keyspace on the connection. + * + * @return A CTConnection ready to talk to a Cassandra cluster + * @throws TTransportException on any Thrift transport failure + */ + public CTConnection makeRawConnection() throws TTransportException { + final Config cfg = cfgRef.get(); + + String hostname = cfg.getRandomHost(); + + log.debug("Creating TSocket({}, {}, {}, {}, {})", hostname, cfg.port, cfg.username, cfg.password, cfg.timeoutMS); + + TSocket socket; + if (null != cfg.sslTruststoreLocation && !cfg.sslTruststoreLocation.isEmpty()) { + TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters() {{ + setTrustStore(cfg.sslTruststoreLocation, cfg.sslTruststorePassword); + }}; + socket = TSSLTransportFactory.getClientSocket(hostname, cfg.port, cfg.timeoutMS, params); + } else { + socket = new TSocket(hostname, cfg.port, cfg.timeoutMS); + } + + TTransport transport = new TFramedTransport(socket, cfg.frameSize); + log.trace("Created transport {}", transport); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + Cassandra.Client client = new Cassandra.Client(protocol); + if (!transport.isOpen()) { + transport.open(); + } + + if (cfg.username != null) { + Map<String, String> credentials = new HashMap<String, String>() {{ + put(IAuthenticator.USERNAME_KEY, cfg.username); + put(IAuthenticator.PASSWORD_KEY, cfg.password); + }}; + + try { + client.login(new AuthenticationRequest(credentials)); + } catch (Exception e) { // TTransportException will propagate authentication/authorization failure + throw new TTransportException(e); + } + } + return new CTConnection(transport, client, cfg); + } + + @Override + public void passivateObject(String key, CTConnection o) throws Exception { + // Do nothing, as in activateObject + } + + @Override + public boolean validateObject(String key, CTConnection c) { + Config curCfg = cfgRef.get(); + + boolean isSameConfig = c.getConfig().equals(curCfg); + if (log.isDebugEnabled()) { + if (isSameConfig) { + log.trace("Validated {} by configuration {}", c, curCfg); + } else { + log.trace("Rejected {}; current config is {}; rejected connection config is {}", + c, curCfg, c.getConfig()); + } + } + + return isSameConfig && c.isOpen(); + } + + public static class Config { + + private final String[] hostnames; + private final int port; + private final String username; + private final String password; + private final Random random; + + private int timeoutMS; + private int frameSize; + + private String sslTruststoreLocation; + private String sslTruststorePassword; + + private boolean isBuilt; + + public Config(String[] hostnames, int port, String username, String password) { + this.hostnames = hostnames; + this.port = port; + this.username = username; + this.password = password; + this.random = new Random(); + } + + // TODO: we don't really need getters/setters here as all of the fields are final and immutable + + public String getHostname() { + return hostnames[0]; + } + + public int getPort() { + return port; + } + + public String getRandomHost() { + return hostnames.length == 1 ? hostnames[0] : hostnames[random.nextInt(hostnames.length)]; + } + + public Config setTimeoutMS(int timeoutMS) { + checkIfAlreadyBuilt(); + this.timeoutMS = timeoutMS; + return this; + } + + public Config setFrameSize(int frameSize) { + checkIfAlreadyBuilt(); + this.frameSize = frameSize; + return this; + } + + public Config setSSLTruststoreLocation(String location) { + checkIfAlreadyBuilt(); + this.sslTruststoreLocation = location; + return this; + } + + public Config setSSLTruststorePassword(String password) { + checkIfAlreadyBuilt(); + this.sslTruststorePassword = password; + return this; + } + + public CTConnectionFactory build() { + isBuilt = true; + return new CTConnectionFactory(this); + } + + + public void checkIfAlreadyBuilt() { + if (isBuilt) + throw new IllegalStateException("Can't accept modifications when used with built factory."); + } + + @Override + public String toString() { + return "Config[hostnames=" + StringUtils.join(hostnames, ',') + ", port=" + port + + ", timeoutMS=" + timeoutMS + ", frameSize=" + frameSize + + "]"; + } + } + +} + diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionPool.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionPool.java new file mode 100644 index 0000000..1af5630 --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/thriftpool/CTConnectionPool.java @@ -0,0 +1,64 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool; + +import org.apache.commons.pool.KeyedPoolableObjectFactory; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class extends Apache Commons Pool's GenericKeyedObjectPool, adding + * two methods that support Java 5 generic type safety. However, a + * programmer can still cause RuntimeExceptions related to type errors + * by mixing calls to these additional methods with calls to the legacy + * "Object"-typed methods. + * <p/> + * <p/> + * Unfortunately, GenericKeyedObjectPool is not actually generic in the + * type-system sense. All of its methods are typed to Object, forcing the + * client programmer to sprinkle code with casts. This class centralizes + * that casting to a single method. + * <p/> + * <p/> + * As a corollary, this class is slightly less flexible than + * GenericKeyedObjectPool, as this class can only store keys and pooled + * objects each of a single type, whereas GenericKeyedObjectPool could + * theoretically contain heterogeneous types of each. However, I do not + * need the flexibility of heterogeneous types for pooling Thrift + * connections, the original work that precipitated writing this class. + * + * @param <K> Key type + * @param <V> Pooled object type + * @author Dan LaRocque <dalaro@hopcount.org> + */ +public class CTConnectionPool extends GenericKeyedObjectPool<String, CTConnection> { + + private static final Logger log = + LoggerFactory.getLogger(CTConnectionPool.class); + + public CTConnectionPool(KeyedPoolableObjectFactory<String, CTConnection> factory) { + super(factory); + } + + /** + * If {@code conn} is non-null and is still open, then call + * {@link GenericKeyedObjectPool#returnObject(String, CTConnection), + * catching and logging and Exception that method might generate. + * This method does not emit any exceptions. + * + * @param keyspace The key of the pooled object being returned + * @param conn The pooled object being returned, or null to do nothing + */ + public void returnObjectUnsafe(String keyspace, CTConnection conn) { + if (conn != null && conn.isOpen()) { + try { + returnObject(keyspace, conn); + } catch (Exception e) { + log.warn("Failed to return Cassandra connection to pool", e); + log.warn( + "Failure context: keyspace={}, pool={}, connection={}", + new Object[] { keyspace, this, conn }); + } + } + } +} + diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraDaemonWrapper.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraDaemonWrapper.java new file mode 100644 index 0000000..acd87db --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraDaemonWrapper.java @@ -0,0 +1,87 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.utils; + +import java.lang.Thread.UncaughtExceptionHandler; + +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.CassandraDaemon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class starts a Thrift CassandraDaemon inside the current JVM. This class + * supports testing and shouldn't be used in production. + * + * This class starts Cassandra on the first invocation of + * {@link CassandraDaemonWrapper#start(String)} in the life of the JVM. + * Invocations after the first have no effect except that they may log a + * warning. + * + * When the thread that first called {@code #start(String)} dies, a daemon + * thread returns from {@link Thread#join()} and kills all embedded Cassandra + * threads in the JVM. + * + * This class once supported consecutive, idempotent calls to start(String) so + * long as the argument was the same in each invocation. It also once used + * refcounting to kill Cassandra's non-daemon threads once stop() was called as + * many times as start(). Some of Cassandra's background threads and statics + * can't be easily reset to allow a restart inside the same JVM, so this was + * intended as a one-use thing. However, this interacts poorly with the new + * KCVConfiguration system in titan-core. When KCVConfiguration is in use, core + * starts and stops each backend at least twice in the course of opening a + * single database instance. So the old refcounting and killing approach is out. + * + * @author Dan LaRocque <dalaro@hopcount.org> + */ +public class CassandraDaemonWrapper { + + private static final Logger log = + LoggerFactory.getLogger(CassandraDaemonWrapper.class); + + private static String activeConfig; + + private static boolean started; + + public static synchronized void start(String config) { + + if (started) { + if (null != config && !config.equals(activeConfig)) { + log.warn("Can't start in-process Cassandra instance " + + "with yaml path {} because an instance was " + + "previously started with yaml path {}", + config, activeConfig); + } + + return; + } + + started = true; + + log.debug("Current working directory: {}", System.getProperty("user.dir")); + + System.setProperty("cassandra.config", config); + // Prevent Cassandra from closing stdout/stderr streams + System.setProperty("cassandra-foreground", "yes"); + // Prevent Cassandra from overwriting Log4J configuration + System.setProperty("log4j.defaultInitOverride", "false"); + + log.info("Starting cassandra with {}", config); + + /* + * This main method doesn't block for any substantial length of time. It + * creates and starts threads and returns in relatively short order. + */ + CassandraDaemon.main(new String[0]); + + activeConfig = config; + } + + public static synchronized boolean isStarted() { + return started; + } + + public static void stop() { + // Do nothing + } +} diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraHelper.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraHelper.java new file mode 100644 index 0000000..f8bd071 --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/utils/CassandraHelper.java @@ -0,0 +1,139 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.utils; + +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; +import com.thinkaurelius.titan.diskstorage.EntryList; +import com.thinkaurelius.titan.diskstorage.StaticBuffer; +import com.thinkaurelius.titan.diskstorage.Entry; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange; +import com.thinkaurelius.titan.diskstorage.util.BufferUtil; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList; +import org.apache.cassandra.dht.BytesToken; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +import javax.annotation.Nullable; + +public class CassandraHelper { + + public static List<ByteBuffer> convert(List<StaticBuffer> keys) { + List<ByteBuffer> requestKeys = new ArrayList<ByteBuffer>(keys.size()); + for (int i = 0; i < keys.size(); i++) { + requestKeys.add(keys.get(i).asByteBuffer()); + } + return requestKeys; + } + + /** + * Constructs an {@link EntryList} from the Iterable of entries while excluding the end slice + * (since the method contract states that the end slice is exclusive, yet Cassandra treats it as + * inclusive) and respecting the limit. + * + * @param entries + * @param getter + * @param lastColumn TODO: make this StaticBuffer so we can avoid the conversion and provide equals method + * @param limit + * @param <E> + * @return + */ + public static<E> EntryList makeEntryList(final Iterable<E> entries, + final StaticArrayEntry.GetColVal<E,ByteBuffer> getter, + final StaticBuffer lastColumn, final int limit) { + return StaticArrayEntryList.ofByteBuffer(new Iterable<E>() { + @Override + public Iterator<E> iterator() { + return Iterators.filter(entries.iterator(),new FilterResultColumns<E>(lastColumn,limit,getter)); + } + },getter); + } + + private static class FilterResultColumns<E> implements Predicate<E> { + + private int count = 0; + + private final int limit; + private final StaticBuffer lastColumn; + private final StaticArrayEntry.GetColVal<E,ByteBuffer> getter; + + private FilterResultColumns(StaticBuffer lastColumn, int limit, StaticArrayEntry.GetColVal<E, ByteBuffer> getter) { + this.limit = limit; + this.lastColumn = lastColumn; + this.getter = getter; + } + + @Override + public boolean apply(@Nullable E e) { + assert e!=null; + if (count>=limit || BufferUtil.equals(lastColumn, getter.getColumn(e))) return false; + count++; + return true; + } + + } + + public static<E> Iterator<Entry> makeEntryIterator(final Iterable<E> entries, + final StaticArrayEntry.GetColVal<E,ByteBuffer> getter, + final StaticBuffer lastColumn, final int limit) { + return Iterators.transform(Iterators.filter(entries.iterator(), + new FilterResultColumns<E>(lastColumn, limit, getter)), new Function<E, Entry>() { + @Nullable + @Override + public Entry apply(@Nullable E e) { + return StaticArrayEntry.ofByteBuffer(e,getter); + } + }); + } + + + public static KeyRange transformRange(Range<Token> range) { + return transformRange(range.left, range.right); + } + + public static KeyRange transformRange(Token leftKeyExclusive, Token rightKeyInclusive) { + if (!(leftKeyExclusive instanceof BytesToken)) + throw new UnsupportedOperationException(); + + // if left part is BytesToken, right part should be too, otherwise there is no sense in the ring + assert rightKeyInclusive instanceof BytesToken; + + // l is exclusive, r is inclusive + BytesToken l = (BytesToken) leftKeyExclusive; + BytesToken r = (BytesToken) rightKeyInclusive; + + byte[] leftTokenValue = l.getTokenValue(); + byte[] rightTokenValue = r.getTokenValue(); + + Preconditions.checkArgument(leftTokenValue.length == rightTokenValue.length, "Tokens have unequal length"); + int tokenLength = leftTokenValue.length; + + byte[][] tokens = new byte[][]{leftTokenValue, rightTokenValue}; + byte[][] plusOne = new byte[2][tokenLength]; + + for (int j = 0; j < 2; j++) { + boolean carry = true; + for (int i = tokenLength - 1; i >= 0; i--) { + byte b = tokens[j][i]; + if (carry) { + b++; + carry = false; + } + if (b == 0) carry = true; + plusOne[j][i] = b; + } + } + + StaticBuffer lb = StaticArrayBuffer.of(plusOne[0]); + StaticBuffer rb = StaticArrayBuffer.of(plusOne[1]); + Preconditions.checkArgument(lb.length() == tokenLength, lb.length()); + Preconditions.checkArgument(rb.length() == tokenLength, rb.length()); + + return new KeyRange(lb, rb); + } +} diff --git a/src/main/java/org/apache/cassandra/thrift/TBinaryProtocol.java b/src/main/java/org/apache/cassandra/thrift/TBinaryProtocol.java new file mode 100644 index 0000000..1bb8f1f --- /dev/null +++ b/src/main/java/org/apache/cassandra/thrift/TBinaryProtocol.java @@ -0,0 +1,21 @@ +package org.apache.cassandra.thrift; + +import org.apache.thrift.transport.TTransport; + +/** + * This is necessary until Astyanax is updated to "officially" support Cassandra 2.0.x. + * + * The story is as follows: Cassandra 2.0.x moved to the new version of Thrift (0.9.x) + * where problem with TBinaryProtocol was fixed, so TBinaryProtocol class was removed as no longer necessary. + * Astyanax in it's current state still wants to use TBinaryProtocol bundled with Cassandra, + * so this class is essentially tricking it (Astyanax) into believing that class is still there. + * + * No other changes necessary to make Astyanax work with Cassandra 2.0.x because Thrift API is completely in-tact. + */ + +@SuppressWarnings("unused") +public class TBinaryProtocol extends org.apache.thrift.protocol.TBinaryProtocol { + public TBinaryProtocol(TTransport trans) { + super(trans); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/CassandraStorageSetup.java b/src/test/java/com/thinkaurelius/titan/CassandraStorageSetup.java new file mode 100644 index 0000000..3b70533 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/CassandraStorageSetup.java @@ -0,0 +1,182 @@ +package com.thinkaurelius.titan; + +import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.CASSANDRA_KEYSPACE; + +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager; + +import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraDaemonWrapper; +import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; + +import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.*; + +public class CassandraStorageSetup { + + public static final String CONFDIR_SYSPROP = "test.cassandra.confdir"; + public static final String DATADIR_SYSPROP = "test.cassandra.datadir"; + + private static volatile Paths paths; + + private static final Logger log = LoggerFactory.getLogger(CassandraStorageSetup.class); + + private static synchronized Paths getPaths() { + if (null == paths) { + String yamlPath = "file://" + loadAbsoluteDirectoryPath("conf", CONFDIR_SYSPROP, true) + File.separator + "cassandra.yaml"; + String dataPath = loadAbsoluteDirectoryPath("data", DATADIR_SYSPROP, false); + paths = new Paths(yamlPath, dataPath); + } + return paths; + } + + private static ModifiableConfiguration getGenericConfiguration(String ks, String backend) { + ModifiableConfiguration config = buildGraphConfiguration(); + config.set(CASSANDRA_KEYSPACE, cleanKeyspaceName(ks)); + log.debug("Set keyspace name: {}", config.get(CASSANDRA_KEYSPACE)); + config.set(PAGE_SIZE,500); + config.set(CONNECTION_TIMEOUT, Duration.ofSeconds(60L)); + config.set(STORAGE_BACKEND, backend); + return config; + } + + + public static ModifiableConfiguration getEmbeddedConfiguration(String ks) { + ModifiableConfiguration config = getGenericConfiguration(ks, "embeddedcassandra"); + config.set(STORAGE_CONF_FILE, getPaths().yamlPath); + return config; + } + + public static ModifiableConfiguration getEmbeddedCassandraPartitionConfiguration(String ks) { + ModifiableConfiguration config = getEmbeddedConfiguration(ks); + config.set(IDS_FLUSH,false); + return config; + } + + public static WriteConfiguration getEmbeddedGraphConfiguration(String ks) { + return getEmbeddedConfiguration(ks).getConfiguration(); + } + + public static WriteConfiguration getEmbeddedCassandraPartitionGraphConfiguration(String ks) { + return getEmbeddedConfiguration(ks).getConfiguration(); + } + + public static ModifiableConfiguration getAstyanaxConfiguration(String ks) { + return getGenericConfiguration(ks, "astyanax"); + } + + public static ModifiableConfiguration getAstyanaxSSLConfiguration(String ks) { + return enableSSL(getGenericConfiguration(ks, "astyanax")); + } + + public static WriteConfiguration getAstyanaxGraphConfiguration(String ks) { + return getAstyanaxConfiguration(ks).getConfiguration(); + } + + public static ModifiableConfiguration getCassandraConfiguration(String ks) { + return getGenericConfiguration(ks, "cassandra"); + } + + public static WriteConfiguration getCassandraGraphConfiguration(String ks) { + return getCassandraConfiguration(ks).getConfiguration(); + } + + public static ModifiableConfiguration getCassandraThriftConfiguration(String ks) { + return getGenericConfiguration(ks, "cassandrathrift"); + } + + public static ModifiableConfiguration getCassandraThriftSSLConfiguration(String ks) { + return enableSSL(getGenericConfiguration(ks, "cassandrathrift")); + } + + public static WriteConfiguration getCassandraThriftGraphConfiguration(String ks) { + return getCassandraThriftConfiguration(ks).getConfiguration(); + } + + /** + * Load cassandra.yaml and data paths from the environment or from default + * values if nothing is set in the environment, then delete all existing + * data, and finally start Cassandra. + * <p> + * This method is idempotent. Calls after the first have no effect aside + * from logging statements. + */ + public static void startCleanEmbedded() { + startCleanEmbedded(getPaths()); + } + + /* + * Cassandra only accepts keyspace names 48 characters long or shorter made + * up of alphanumeric characters and underscores. + */ + public static String cleanKeyspaceName(String raw) { + Preconditions.checkNotNull(raw); + Preconditions.checkArgument(0 < raw.length()); + + if (48 < raw.length() || raw.matches("[^a-zA-Z_]")) { + return "strhash" + String.valueOf(Math.abs(raw.hashCode())); + } else { + return raw; + } + } + + private static ModifiableConfiguration enableSSL(ModifiableConfiguration mc) { + mc.set(AbstractCassandraStoreManager.SSL_ENABLED, true); + mc.set(STORAGE_HOSTS, new String[]{ "localhost" }); + mc.set(AbstractCassandraStoreManager.SSL_TRUSTSTORE_LOCATION, + Joiner.on(File.separator).join("target", "cassandra", "conf", "localhost-murmur-ssl", "test.truststore")); + mc.set(AbstractCassandraStoreManager.SSL_TRUSTSTORE_PASSWORD, "cassandra"); + return mc; + } + + private static void startCleanEmbedded(Paths p) { + if (!CassandraDaemonWrapper.isStarted()) { + try { + FileUtils.deleteDirectory(new File(p.dataPath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + CassandraDaemonWrapper.start(p.yamlPath); + } + + private static String loadAbsoluteDirectoryPath(String name, String prop, boolean mustExistAndBeAbsolute) { + String s = System.getProperty(prop); + + if (null == s) { + s = Joiner.on(File.separator).join(System.getProperty("user.dir"), "target", "cassandra", name, "localhost-bop"); + log.info("Set default Cassandra {} directory path {}", name, s); + } else { + log.info("Loaded Cassandra {} directory path {} from system property {}", new Object[] { name, s, prop }); + } + + if (mustExistAndBeAbsolute) { + File dir = new File(s); + Preconditions.checkArgument(dir.isDirectory(), "Path %s must be a directory", s); + Preconditions.checkArgument(dir.isAbsolute(), "Path %s must be absolute", s); + } + + return s; + } + + private static class Paths { + private final String yamlPath; + private final String dataPath; + + public Paths(String yamlPath, String dataPath) { + this.yamlPath = yamlPath; + this.dataPath = dataPath; + } + } +} diff --git a/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphComputerProvider.java b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphComputerProvider.java new file mode 100644 index 0000000..c77fed4 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphComputerProvider.java @@ -0,0 +1,22 @@ +package com.thinkaurelius.titan.blueprints.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.blueprints.AbstractTitanGraphComputerProvider; +import com.thinkaurelius.titan.blueprints.AbstractTitanGraphProvider; +import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; +import com.thinkaurelius.titan.graphdb.olap.computer.FulgoraGraphComputer; +import org.apache.tinkerpop.gremlin.GraphProvider; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@GraphProvider.Descriptor(computer = FulgoraGraphComputer.class) +public class ThriftGraphComputerProvider extends AbstractTitanGraphComputerProvider { + + @Override + public ModifiableConfiguration getTitanConfiguration(String graphName, Class<?> test, String testMethodName) { + CassandraStorageSetup.startCleanEmbedded(); + return CassandraStorageSetup.getCassandraThriftConfiguration(graphName); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphProvider.java b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphProvider.java new file mode 100644 index 0000000..ce8ee76 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/ThriftGraphProvider.java @@ -0,0 +1,18 @@ +package com.thinkaurelius.titan.blueprints.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.blueprints.AbstractTitanGraphProvider; +import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +public class ThriftGraphProvider extends AbstractTitanGraphProvider { + + @Override + public ModifiableConfiguration getTitanConfiguration(String graphName, Class<?> test, String testMethodName) { + CassandraStorageSetup.startCleanEmbedded(); + return CassandraStorageSetup.getCassandraThriftConfiguration(graphName); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftComputerTest.java b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftComputerTest.java new file mode 100644 index 0000000..3109b8a --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftComputerTest.java @@ -0,0 +1,25 @@ +package com.thinkaurelius.titan.blueprints.thrift.process; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.blueprints.thrift.ThriftGraphComputerProvider; +import com.thinkaurelius.titan.blueprints.thrift.ThriftGraphProvider; +import com.thinkaurelius.titan.core.TitanGraph; +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@RunWith(ProcessComputerSuite.class) +@GraphProviderClass(provider = ThriftGraphComputerProvider.class, graph = TitanGraph.class) +public class ThriftComputerTest { + +// TP3 ignores @BeforeClass -- the following method is never executed +// @BeforeClass +// public static void beforeSuite() { +// CassandraStorageSetup.startCleanEmbedded(); +// } + +}
\ No newline at end of file diff --git a/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftProcessTest.java b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftProcessTest.java new file mode 100644 index 0000000..0feb338 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/process/ThriftProcessTest.java @@ -0,0 +1,25 @@ +package com.thinkaurelius.titan.blueprints.thrift.process; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.blueprints.thrift.ThriftGraphProvider; +import com.thinkaurelius.titan.core.TitanGraph; +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite; +import org.apache.tinkerpop.gremlin.structure.StructureStandardSuite; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@RunWith(ProcessStandardSuite.class) +@GraphProviderClass(provider = ThriftGraphProvider.class, graph = TitanGraph.class) +public class ThriftProcessTest { + +// TP3 ignores @BeforeClass -- the following method is never executed +// @BeforeClass +// public static void beforeSuite() { +// CassandraStorageSetup.startCleanEmbedded(); +// } + +} diff --git a/src/test/java/com/thinkaurelius/titan/blueprints/thrift/structure/ThriftStructureTest.java b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/structure/ThriftStructureTest.java new file mode 100644 index 0000000..2d1d7eb --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/blueprints/thrift/structure/ThriftStructureTest.java @@ -0,0 +1,24 @@ +package com.thinkaurelius.titan.blueprints.thrift.structure; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.blueprints.thrift.ThriftGraphProvider; +import com.thinkaurelius.titan.core.TitanGraph; +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.structure.StructureStandardSuite; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@RunWith(StructureStandardSuite.class) +@GraphProviderClass(provider = ThriftGraphProvider.class, graph = TitanGraph.class) +public class ThriftStructureTest { + +// TP3 ignores @BeforeClass -- the following method is never executed +// @BeforeClass +// public static void beforeSuite() { +// CassandraStorageSetup.startCleanEmbedded(); +// } + +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreTest.java new file mode 100644 index 0000000..623a742 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreTest.java @@ -0,0 +1,136 @@ +package com.thinkaurelius.titan.diskstorage.cassandra; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.Map; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableMap; +import com.thinkaurelius.titan.diskstorage.KeyColumnValueStoreTest; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures; +import com.thinkaurelius.titan.testcategory.OrderedKeyStoreTests; +import com.thinkaurelius.titan.testcategory.UnorderedKeyStoreTests; + +public abstract class AbstractCassandraStoreTest extends KeyColumnValueStoreTest { + + private static final Logger log = + LoggerFactory.getLogger(AbstractCassandraStoreTest.class); + private static final String TEST_CF_NAME = "testcf"; + private static final String DEFAULT_COMPRESSOR_PACKAGE = "org.apache.cassandra.io.compress"; + + public abstract ModifiableConfiguration getBaseStorageConfiguration(); + + public abstract AbstractCassandraStoreManager openStorageManager(Configuration c) throws BackendException; + + @Test + @Category({ UnorderedKeyStoreTests.class }) + public void testUnorderedConfiguration() { + if (!manager.getFeatures().hasUnorderedScan()) { + log.warn( + "Can't test key-unordered features on incompatible store. " + + "This warning could indicate reduced test coverage and " + + "a broken JUnit configuration. Skipping test {}.", + name.getMethodName()); + return; + } + + StoreFeatures features = manager.getFeatures(); + assertFalse(features.isKeyOrdered()); + assertFalse(features.hasLocalKeyPartition()); + } + + @Test + @Category({ OrderedKeyStoreTests.class }) + public void testOrderedConfiguration() { + if (!manager.getFeatures().hasOrderedScan()) { + log.warn( + "Can't test key-ordered features on incompatible store. " + + "This warning could indicate reduced test coverage and " + + "a broken JUnit configuration. Skipping test {}.", + name.getMethodName()); + return; + } + + StoreFeatures features = manager.getFeatures(); + assertTrue(features.isKeyOrdered()); + } + + @Test + public void testDefaultCFCompressor() throws BackendException { + + final String cf = TEST_CF_NAME + "_snappy"; + + AbstractCassandraStoreManager mgr = openStorageManager(); + + mgr.openDatabase(cf); + + Map<String, String> defaultCfCompressionOps = + new ImmutableMap.Builder<String, String>() + .put("sstable_compression", DEFAULT_COMPRESSOR_PACKAGE + "." + AbstractCassandraStoreManager.CF_COMPRESSION_TYPE.getDefaultValue()) + .put("chunk_length_kb", "64") + .build(); + + assertEquals(defaultCfCompressionOps, mgr.getCompressionOptions(cf)); + } + + @Test + public void testCustomCFCompressor() throws BackendException { + + final String cname = "DeflateCompressor"; + final int ckb = 128; + final String cf = TEST_CF_NAME + "_gzip"; + + ModifiableConfiguration config = getBaseStorageConfiguration(); + config.set(AbstractCassandraStoreManager.CF_COMPRESSION_TYPE,cname); + config.set(AbstractCassandraStoreManager.CF_COMPRESSION_BLOCK_SIZE,ckb); + + AbstractCassandraStoreManager mgr = openStorageManager(config); + + // N.B.: clearStorage() truncates CFs but does not delete them + mgr.openDatabase(cf); + + final Map<String, String> expected = ImmutableMap + .<String, String> builder() + .put("sstable_compression", + DEFAULT_COMPRESSOR_PACKAGE + "." + cname) + .put("chunk_length_kb", String.valueOf(ckb)).build(); + + assertEquals(expected, mgr.getCompressionOptions(cf)); + } + + @Test + public void testDisableCFCompressor() throws BackendException { + + final String cf = TEST_CF_NAME + "_nocompress"; + + ModifiableConfiguration config = getBaseStorageConfiguration(); + config.set(AbstractCassandraStoreManager.CF_COMPRESSION,false); + AbstractCassandraStoreManager mgr = openStorageManager(config); + + // N.B.: clearStorage() truncates CFs but does not delete them + mgr.openDatabase(cf); + + assertEquals(Collections.emptyMap(), mgr.getCompressionOptions(cf)); + } + + @Test + public void testTTLSupported() throws Exception { + StoreFeatures features = manager.getFeatures(); + assertTrue(features.hasCellTTL()); + } + + @Override + public AbstractCassandraStoreManager openStorageManager() throws BackendException { + return openStorageManager(getBaseStorageConfiguration()); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransactionTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransactionTest.java new file mode 100644 index 0000000..c602f97 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/CassandraTransactionTest.java @@ -0,0 +1,79 @@ +package com.thinkaurelius.titan.diskstorage.cassandra; + +import com.google.common.base.Preconditions; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; +import com.thinkaurelius.titan.diskstorage.util.StandardBaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.util.time.TimestampProviders; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import org.junit.Test; + +import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.CASSANDRA_READ_CONSISTENCY; +import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.CASSANDRA_WRITE_CONSISTENCY; +import static org.junit.Assert.*; + +public class CassandraTransactionTest { + + /* testRead/WriteConsistencyLevel have unnecessary code duplication + * that could be avoided by creating a common helper method that takes + * a ConfigOption parameter and a function that converts a + * CassandraTransaction to a consistency level by calling either + * ct.getReadConsistencyLevel() or .getWriteConsistencyLevel(), + * but it doesn't seem worth the complexity. + */ + + @Test + public void testWriteConsistencyLevel() { + int levelsChecked = 0; + + // Test whether CassandraTransaction honors the write consistency level option + for (CLevel writeLevel : CLevel.values()) { + StandardBaseTransactionConfig.Builder b = new StandardBaseTransactionConfig.Builder(); + ModifiableConfiguration mc = GraphDatabaseConfiguration.buildGraphConfiguration(); + mc.set(CASSANDRA_WRITE_CONSISTENCY, writeLevel.name()); + b.customOptions(mc); + b.timestampProvider(TimestampProviders.MICRO); + CassandraTransaction ct = new CassandraTransaction(b.build()); + assertEquals(writeLevel, ct.getWriteConsistencyLevel()); + levelsChecked++; + } + + // Sanity check: if CLevel.values was empty, something is wrong with the test + Preconditions.checkState(0 < levelsChecked); + } + + @Test + public void testReadConsistencyLevel() { + int levelsChecked = 0; + + // Test whether CassandraTransaction honors the write consistency level option + for (CLevel writeLevel : CLevel.values()) { + StandardBaseTransactionConfig.Builder b = new StandardBaseTransactionConfig.Builder(); + ModifiableConfiguration mc = GraphDatabaseConfiguration.buildGraphConfiguration(); + mc.set(CASSANDRA_READ_CONSISTENCY, writeLevel.name()); + b.timestampProvider(TimestampProviders.MICRO); + b.customOptions(mc); + CassandraTransaction ct = new CassandraTransaction(b.build()); + assertEquals(writeLevel, ct.getReadConsistencyLevel()); + levelsChecked++; + } + + // Sanity check: if CLevel.values was empty, something is wrong with the test + Preconditions.checkState(0 < levelsChecked); + } + + @Test + public void testTimestampProvider() { + BaseTransactionConfig txcfg = StandardBaseTransactionConfig.of(TimestampProviders.NANO); + CassandraTransaction ct = new CassandraTransaction(txcfg); + assertEquals(TimestampProviders.NANO, ct.getConfiguration().getTimestampProvider()); + + txcfg = StandardBaseTransactionConfig.of(TimestampProviders.MICRO); + ct = new CassandraTransaction(txcfg); + assertEquals(TimestampProviders.MICRO, ct.getConfiguration().getTimestampProvider()); + + txcfg = StandardBaseTransactionConfig.of(TimestampProviders.MILLI); + ct = new CassandraTransaction(txcfg); + assertEquals(TimestampProviders.MILLI, ct.getConfiguration().getTimestampProvider()); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/UUIDTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/UUIDTest.java new file mode 100644 index 0000000..356e8ce --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/UUIDTest.java @@ -0,0 +1,34 @@ +package com.thinkaurelius.titan.diskstorage.cassandra; + +import org.apache.cassandra.db.marshal.TimeUUIDType; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.thinkaurelius.titan.testcategory.StandaloneTests; + +import java.nio.ByteBuffer; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +@Category({StandaloneTests.class}) +public class UUIDTest { + public static final String z = "00000000-0000-1000-0000-000000000000"; + public static final String v = "9451e273-7753-11e0-92df-e700f669bcfc"; + + @Test + public void timeUUIDComparison() { + TimeUUIDType ti = TimeUUIDType.instance; + + UUID zu = UUID.fromString(z); + UUID vu = UUID.fromString(v); + + ByteBuffer zb = ti.decompose(zu); + ByteBuffer vb = ti.decompose(vu); + + assertEquals(-1, ti.compare(zb, vb)); + assertEquals(1, zu.compareTo(vu)); + assertEquals(1, ti.compare(vb, zb)); + assertEquals(-1, vu.compareTo(zu)); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxIDAuthorityTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxIDAuthorityTest.java new file mode 100644 index 0000000..662b60a --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxIDAuthorityTest.java @@ -0,0 +1,25 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.astyanax; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.IDAuthorityTest; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.junit.BeforeClass; + +public class AstyanaxIDAuthorityTest extends IDAuthorityTest { + + public AstyanaxIDAuthorityTest(WriteConfiguration baseConfig) { + super(baseConfig); + } + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new AstyanaxStoreManager(CassandraStorageSetup.getAstyanaxConfiguration(getClass().getSimpleName())); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLockStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLockStoreTest.java new file mode 100644 index 0000000..9e47088 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLockStoreTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.astyanax; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import org.junit.BeforeClass; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; + +public class AstyanaxLockStoreTest extends LockKeyColumnValueStoreTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public KeyColumnValueStoreManager openStorageManager(int idx) throws BackendException { + return new AstyanaxStoreManager(CassandraStorageSetup.getAstyanaxConfiguration(getClass().getSimpleName())); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLogTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLogTest.java new file mode 100644 index 0000000..339261f --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxLogTest.java @@ -0,0 +1,25 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.astyanax; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import com.thinkaurelius.titan.diskstorage.log.KCVSLogTest; +import com.thinkaurelius.titan.testcategory.SerialTests; + +@Category(SerialTests.class) +public class AstyanaxLogTest extends KCVSLogTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new AstyanaxStoreManager(CassandraStorageSetup.getAstyanaxConfiguration(getClass().getSimpleName())); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxMultiWriteStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxMultiWriteStoreTest.java new file mode 100644 index 0000000..6de8238 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxMultiWriteStoreTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.astyanax; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import org.junit.BeforeClass; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.MultiWriteKeyColumnValueStoreTest; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; + +public class AstyanaxMultiWriteStoreTest extends MultiWriteKeyColumnValueStoreTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new AstyanaxStoreManager(CassandraStorageSetup.getAstyanaxConfiguration(getClass().getSimpleName())); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxSSLStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxSSLStoreTest.java new file mode 100644 index 0000000..48213b0 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxSSLStoreTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.astyanax; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; +import com.thinkaurelius.titan.testcategory.CassandraSSLTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ CassandraSSLTests.class }) +public class AstyanaxSSLStoreTest extends AstyanaxStoreTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public ModifiableConfiguration getBaseStorageConfiguration() { + return CassandraStorageSetup.getAstyanaxSSLConfiguration(getClass().getSimpleName()); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreTest.java new file mode 100644 index 0000000..3719ff6 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxStoreTest.java @@ -0,0 +1,28 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.astyanax; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; +import org.junit.BeforeClass; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreTest; +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager; + +public class AstyanaxStoreTest extends AbstractCassandraStoreTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public ModifiableConfiguration getBaseStorageConfiguration() { + return CassandraStorageSetup.getAstyanaxConfiguration(getClass().getSimpleName()); + } + + @Override + public AbstractCassandraStoreManager openStorageManager(Configuration c) throws BackendException { + return new AstyanaxStoreManager(c); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedIDAuthorityTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedIDAuthorityTest.java new file mode 100644 index 0000000..252889a --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedIDAuthorityTest.java @@ -0,0 +1,19 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.embedded; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.IDAuthorityTest; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; + +public class EmbeddedIDAuthorityTest extends IDAuthorityTest { + + public EmbeddedIDAuthorityTest(WriteConfiguration baseConfig) { + super(baseConfig); + } + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new CassandraEmbeddedStoreManager(CassandraStorageSetup.getEmbeddedConfiguration(getClass().getSimpleName())); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLockStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLockStoreTest.java new file mode 100644 index 0000000..960b802 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLockStoreTest.java @@ -0,0 +1,14 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.embedded; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; + +public class EmbeddedLockStoreTest extends LockKeyColumnValueStoreTest { + + @Override + public KeyColumnValueStoreManager openStorageManager(int idx) throws BackendException { + return new CassandraEmbeddedStoreManager(CassandraStorageSetup.getEmbeddedConfiguration(getClass().getSimpleName())); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLogTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLogTest.java new file mode 100644 index 0000000..36436e7 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedLogTest.java @@ -0,0 +1,24 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.embedded; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import com.thinkaurelius.titan.diskstorage.log.KCVSLogTest; +import com.thinkaurelius.titan.testcategory.SerialTests; + +@Category(SerialTests.class) +public class EmbeddedLogTest extends KCVSLogTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new CassandraEmbeddedStoreManager(CassandraStorageSetup.getEmbeddedConfiguration(getClass().getSimpleName())); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedMultiWriteStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedMultiWriteStoreTest.java new file mode 100644 index 0000000..d4608c9 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedMultiWriteStoreTest.java @@ -0,0 +1,15 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.embedded; + +import com.thinkaurelius.titan.diskstorage.BackendException; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.MultiWriteKeyColumnValueStoreTest; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; + +public class EmbeddedMultiWriteStoreTest extends MultiWriteKeyColumnValueStoreTest { + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new CassandraEmbeddedStoreManager(CassandraStorageSetup.getEmbeddedConfiguration(getClass().getSimpleName())); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedStoreTest.java new file mode 100644 index 0000000..e2a9bc0 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/embedded/EmbeddedStoreTest.java @@ -0,0 +1,42 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.embedded; + +import static org.junit.Assert.assertTrue; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreTest; +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures; +import com.thinkaurelius.titan.testcategory.OrderedKeyStoreTests; + +public class EmbeddedStoreTest extends AbstractCassandraStoreTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public ModifiableConfiguration getBaseStorageConfiguration() { + return CassandraStorageSetup.getEmbeddedConfiguration(getClass().getSimpleName()); + } + + @Override + public AbstractCassandraStoreManager openStorageManager(Configuration c) throws BackendException { + return new CassandraEmbeddedStoreManager(c); + } + + @Test + @Category({ OrderedKeyStoreTests.class }) + public void testConfiguration() { + StoreFeatures features = manager.getFeatures(); + assertTrue(features.isKeyOrdered()); + assertTrue(features.hasLocalKeyPartition()); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftDistributedStoreManagerTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftDistributedStoreManagerTest.java new file mode 100644 index 0000000..9e16585 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftDistributedStoreManagerTest.java @@ -0,0 +1,30 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.DistributedStoreManagerTest; + +public class ThriftDistributedStoreManagerTest extends DistributedStoreManagerTest<CassandraThriftStoreManager> { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Before + public void setUp() throws BackendException { + manager = new CassandraThriftStoreManager( + CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName())); + store = manager.openDatabase("distributedcf"); + } + + @After + public void tearDown() throws BackendException { + if (null != manager) + manager.close(); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftIDAuthorityTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftIDAuthorityTest.java new file mode 100644 index 0000000..9e1881a --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftIDAuthorityTest.java @@ -0,0 +1,25 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.IDAuthorityTest; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.junit.BeforeClass; + +public class ThriftIDAuthorityTest extends IDAuthorityTest { + + public ThriftIDAuthorityTest(WriteConfiguration baseConfig) { + super(baseConfig); + } + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new CassandraThriftStoreManager(CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName())); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLockStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLockStoreTest.java new file mode 100644 index 0000000..223d399 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLockStoreTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import org.junit.BeforeClass; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; + +public class ThriftLockStoreTest extends LockKeyColumnValueStoreTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public KeyColumnValueStoreManager openStorageManager(int idx) throws BackendException { + return new CassandraThriftStoreManager(CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName())); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLogTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLogTest.java new file mode 100644 index 0000000..7123b5f --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftLogTest.java @@ -0,0 +1,25 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import com.thinkaurelius.titan.diskstorage.log.KCVSLogTest; +import com.thinkaurelius.titan.testcategory.SerialTests; + +@Category(SerialTests.class) +public class ThriftLogTest extends KCVSLogTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new CassandraThriftStoreManager(CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName())); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftMultiWriteStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftMultiWriteStoreTest.java new file mode 100644 index 0000000..cbfce61 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftMultiWriteStoreTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.MultiWriteKeyColumnValueStoreTest; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; + +import org.junit.BeforeClass; + +public class ThriftMultiWriteStoreTest extends MultiWriteKeyColumnValueStoreTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new CassandraThriftStoreManager(CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName())); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftSSLStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftSSLStoreTest.java new file mode 100644 index 0000000..ffc93b5 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftSSLStoreTest.java @@ -0,0 +1,22 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift; + + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; +import com.thinkaurelius.titan.testcategory.CassandraSSLTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ CassandraSSLTests.class }) +public class ThriftSSLStoreTest extends ThriftStoreTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public ModifiableConfiguration getBaseStorageConfiguration() { + return CassandraStorageSetup.getCassandraThriftSSLConfiguration(this.getClass().getSimpleName()); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftStoreTest.java b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftStoreTest.java new file mode 100644 index 0000000..8254530 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/ThriftStoreTest.java @@ -0,0 +1,28 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.thrift; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration; +import org.junit.BeforeClass; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreTest; +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager; + +public class ThriftStoreTest extends AbstractCassandraStoreTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public ModifiableConfiguration getBaseStorageConfiguration() { + return CassandraStorageSetup.getCassandraThriftConfiguration(this.getClass().getSimpleName()); + } + + @Override + public AbstractCassandraStoreManager openStorageManager(Configuration c) throws BackendException { + return new CassandraThriftStoreManager(c); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/CassandraGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/CassandraGraphTest.java new file mode 100644 index 0000000..0aab994 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/CassandraGraphTest.java @@ -0,0 +1,95 @@ +package com.thinkaurelius.titan.graphdb; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.core.TitanFactory; +import com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph; +import com.thinkaurelius.titan.graphdb.transaction.StandardTitanTx; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.*; + +/** + * @author Joshua Shinavier (http://fortytwo.net) + */ +public abstract class CassandraGraphTest extends TitanGraphTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + protected boolean isLockingOptimistic() { + return true; + } + + @Test + public void testHasTTL() throws Exception { + assertTrue(features.hasCellTTL()); + } + + @Test + public void testGraphConfigUsedByThreadBoundTx() { + close(); + WriteConfiguration wc = getConfiguration(); + wc.set(ConfigElement.getPath(CASSANDRA_READ_CONSISTENCY), "ALL"); + wc.set(ConfigElement.getPath(CASSANDRA_WRITE_CONSISTENCY), "LOCAL_QUORUM"); + + graph = (StandardTitanGraph) TitanFactory.open(wc); + + StandardTitanTx tx = (StandardTitanTx)graph.getCurrentThreadTx(); + assertEquals("ALL", + tx.getTxHandle().getBaseTransactionConfig().getCustomOptions() + .get(AbstractCassandraStoreManager.CASSANDRA_READ_CONSISTENCY)); + assertEquals("LOCAL_QUORUM", + tx.getTxHandle().getBaseTransactionConfig().getCustomOptions() + .get(AbstractCassandraStoreManager.CASSANDRA_WRITE_CONSISTENCY)); + } + + @Test + public void testGraphConfigUsedByTx() { + close(); + WriteConfiguration wc = getConfiguration(); + wc.set(ConfigElement.getPath(CASSANDRA_READ_CONSISTENCY), "TWO"); + wc.set(ConfigElement.getPath(CASSANDRA_WRITE_CONSISTENCY), "THREE"); + + graph = (StandardTitanGraph) TitanFactory.open(wc); + + StandardTitanTx tx = (StandardTitanTx)graph.newTransaction(); + assertEquals("TWO", + tx.getTxHandle().getBaseTransactionConfig().getCustomOptions() + .get(AbstractCassandraStoreManager.CASSANDRA_READ_CONSISTENCY)); + assertEquals("THREE", + tx.getTxHandle().getBaseTransactionConfig().getCustomOptions() + .get(AbstractCassandraStoreManager.CASSANDRA_WRITE_CONSISTENCY)); + tx.rollback(); + } + + @Test + public void testCustomConfigUsedByTx() { + close(); + WriteConfiguration wc = getConfiguration(); + wc.set(ConfigElement.getPath(CASSANDRA_READ_CONSISTENCY), "ALL"); + wc.set(ConfigElement.getPath(CASSANDRA_WRITE_CONSISTENCY), "ALL"); + + graph = (StandardTitanGraph) TitanFactory.open(wc); + + StandardTitanTx tx = (StandardTitanTx)graph.buildTransaction() + .customOption(ConfigElement.getPath(CASSANDRA_READ_CONSISTENCY), "ONE") + .customOption(ConfigElement.getPath(CASSANDRA_WRITE_CONSISTENCY), "TWO").start(); + + assertEquals("ONE", + tx.getTxHandle().getBaseTransactionConfig().getCustomOptions() + .get(AbstractCassandraStoreManager.CASSANDRA_READ_CONSISTENCY)); + assertEquals("TWO", + tx.getTxHandle().getBaseTransactionConfig().getCustomOptions() + .get(AbstractCassandraStoreManager.CASSANDRA_WRITE_CONSISTENCY)); + tx.rollback(); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphConcurrentTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphConcurrentTest.java new file mode 100644 index 0000000..3a3c0b8 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphConcurrentTest.java @@ -0,0 +1,24 @@ +package com.thinkaurelius.titan.graphdb.astyanax; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanGraphConcurrentTest; +import com.thinkaurelius.titan.testcategory.PerformanceTests; + +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({PerformanceTests.class}) +public class AstyanaxGraphConcurrentTest extends TitanGraphConcurrentTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getAstyanaxGraphConfiguration(getClass().getSimpleName()); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphPerformanceMemoryTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphPerformanceMemoryTest.java new file mode 100644 index 0000000..c20e837 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphPerformanceMemoryTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.graphdb.astyanax; + +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import org.junit.BeforeClass; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.graphdb.TitanGraphPerformanceMemoryTest; + +public class AstyanaxGraphPerformanceMemoryTest extends TitanGraphPerformanceMemoryTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getAstyanaxGraphConfiguration(getClass().getSimpleName()); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphTest.java new file mode 100644 index 0000000..70f595e --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxGraphTest.java @@ -0,0 +1,13 @@ +package com.thinkaurelius.titan.graphdb.astyanax; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.CassandraGraphTest; + +public class AstyanaxGraphTest extends CassandraGraphTest { + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getAstyanaxGraphConfiguration(getClass().getSimpleName()); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxPartitionGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxPartitionGraphTest.java new file mode 100644 index 0000000..c8fb206 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/astyanax/AstyanaxPartitionGraphTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.graphdb.astyanax; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanGraphTest; +import com.thinkaurelius.titan.graphdb.TitanPartitionGraphTest; +import org.junit.BeforeClass; + +public class AstyanaxPartitionGraphTest extends TitanPartitionGraphTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public WriteConfiguration getBaseConfiguration() { + return CassandraStorageSetup.getAstyanaxGraphConfiguration(getClass().getSimpleName()); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedEventualGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedEventualGraphTest.java new file mode 100644 index 0000000..4a4fc0f --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedEventualGraphTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.graphdb.embedded; + +import org.junit.BeforeClass; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanEventualGraphTest; + +public class EmbeddedEventualGraphTest extends TitanEventualGraphTest { + + @BeforeClass + public static void startEmbeddedCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getEmbeddedCassandraPartitionGraphConfiguration(getClass().getSimpleName()); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphConcurrentTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphConcurrentTest.java new file mode 100644 index 0000000..35cc582 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphConcurrentTest.java @@ -0,0 +1,28 @@ +package com.thinkaurelius.titan.graphdb.embedded; + +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanGraphConcurrentTest; +import com.thinkaurelius.titan.testcategory.PerformanceTests; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ + +@Category({PerformanceTests.class}) +public class EmbeddedGraphConcurrentTest extends TitanGraphConcurrentTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getEmbeddedCassandraPartitionGraphConfiguration(getClass().getSimpleName()); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphMemoryPerformanceTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphMemoryPerformanceTest.java new file mode 100644 index 0000000..cd9d75d --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphMemoryPerformanceTest.java @@ -0,0 +1,25 @@ +package com.thinkaurelius.titan.graphdb.embedded; + +import org.junit.BeforeClass; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanGraphPerformanceMemoryTest; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ + +public class EmbeddedGraphMemoryPerformanceTest extends TitanGraphPerformanceMemoryTest { + + @BeforeClass + public static void startCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getEmbeddedCassandraPartitionGraphConfiguration(getClass().getSimpleName()); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphTest.java new file mode 100644 index 0000000..a602e71 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedGraphTest.java @@ -0,0 +1,13 @@ +package com.thinkaurelius.titan.graphdb.embedded; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.CassandraGraphTest; + +public class EmbeddedGraphTest extends CassandraGraphTest { + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getEmbeddedCassandraPartitionGraphConfiguration(getClass().getSimpleName()); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedPartitionGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedPartitionGraphTest.java new file mode 100644 index 0000000..4851b3f --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/embedded/EmbeddedPartitionGraphTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.graphdb.embedded; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanEventualGraphTest; +import com.thinkaurelius.titan.graphdb.TitanPartitionGraphTest; +import org.junit.BeforeClass; + +public class EmbeddedPartitionGraphTest extends TitanPartitionGraphTest { + + @BeforeClass + public static void startEmbeddedCassandra() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public WriteConfiguration getBaseConfiguration() { + return CassandraStorageSetup.getEmbeddedCassandraPartitionGraphConfiguration(getClass().getSimpleName()); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftEventualGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftEventualGraphTest.java new file mode 100644 index 0000000..8fe1362 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftEventualGraphTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.graphdb.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanEventualGraphTest; +import com.thinkaurelius.titan.graphdb.TitanGraphTest; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +public class ThriftEventualGraphTest extends TitanEventualGraphTest { + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName()); + } + + @BeforeClass + public static void beforeClass() { + CassandraStorageSetup.startCleanEmbedded(); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphCacheTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphCacheTest.java new file mode 100644 index 0000000..17848d4 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphCacheTest.java @@ -0,0 +1,28 @@ +package com.thinkaurelius.titan.graphdb.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.StorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanGraphTest; +import org.junit.BeforeClass; + +public class ThriftGraphCacheTest extends TitanGraphTest { + + @Override + public WriteConfiguration getConfiguration() { + return StorageSetup.addPermanentCache(CassandraStorageSetup.getCassandraThriftConfiguration(getClass().getSimpleName())); + } + + + @BeforeClass + public static void beforeClass() { + CassandraStorageSetup.startCleanEmbedded(); + } + + + + @Override + protected boolean isLockingOptimistic() { + return true; + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphConcurrentTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphConcurrentTest.java new file mode 100644 index 0000000..f1d4fe4 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphConcurrentTest.java @@ -0,0 +1,24 @@ +package com.thinkaurelius.titan.graphdb.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanGraphConcurrentTest; +import com.thinkaurelius.titan.testcategory.PerformanceTests; + +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({PerformanceTests.class}) +public class ThriftGraphConcurrentTest extends TitanGraphConcurrentTest { + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName()); + } + + + @BeforeClass + public static void beforeClass() { + CassandraStorageSetup.startCleanEmbedded(); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphIterativeTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphIterativeTest.java new file mode 100644 index 0000000..dd45c42 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphIterativeTest.java @@ -0,0 +1,30 @@ +package com.thinkaurelius.titan.graphdb.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.cassandra.thrift.CassandraThriftStoreManager; +import com.thinkaurelius.titan.diskstorage.configuration.BasicConfiguration; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import com.thinkaurelius.titan.graphdb.TitanGraphIterativeBenchmark; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import org.junit.BeforeClass; + +public class ThriftGraphIterativeTest extends TitanGraphIterativeBenchmark { + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName()); + } + + @Override + public KeyColumnValueStoreManager openStorageManager() throws BackendException { + return new CassandraThriftStoreManager(new BasicConfiguration(GraphDatabaseConfiguration.ROOT_NS,getConfiguration(), BasicConfiguration.Restriction.NONE)); + } + + + @BeforeClass + public static void beforeClass() { + CassandraStorageSetup.startCleanEmbedded(); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphPerformanceMemoryTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphPerformanceMemoryTest.java new file mode 100644 index 0000000..893b6c8 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphPerformanceMemoryTest.java @@ -0,0 +1,22 @@ +package com.thinkaurelius.titan.graphdb.thrift; + +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import org.apache.commons.configuration.Configuration; +import org.junit.BeforeClass; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.graphdb.TitanGraphPerformanceMemoryTest; + +public class ThriftGraphPerformanceMemoryTest extends TitanGraphPerformanceMemoryTest { + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName()); + } + + + @BeforeClass + public static void beforeClass() { + CassandraStorageSetup.startCleanEmbedded(); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphSpeedTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphSpeedTest.java new file mode 100644 index 0000000..6f01f0b --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphSpeedTest.java @@ -0,0 +1,56 @@ +package com.thinkaurelius.titan.graphdb.thrift; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.graphdb.SpeedTestSchema; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.core.TitanFactory; +import com.thinkaurelius.titan.graphdb.TitanGraphSpeedTest; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph; +import com.thinkaurelius.titan.testcategory.PerformanceTests; + +@Category({PerformanceTests.class}) +public class ThriftGraphSpeedTest extends TitanGraphSpeedTest { + + private static StandardTitanGraph graph; + private static SpeedTestSchema schema; + + private static final Logger log = + LoggerFactory.getLogger(ThriftGraphSpeedTest.class); + + public ThriftGraphSpeedTest() throws BackendException { + super(CassandraStorageSetup.getCassandraThriftGraphConfiguration(ThriftGraphSpeedTest.class.getSimpleName())); + } + + @BeforeClass + public static void beforeClass() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + protected StandardTitanGraph getGraph() throws BackendException { + + + if (null == graph) { + GraphDatabaseConfiguration graphconfig = new GraphDatabaseConfiguration(conf); + graphconfig.getBackend().clearStorage(); + log.debug("Cleared backend storage"); + graph = (StandardTitanGraph)TitanFactory.open(conf); + initializeGraph(graph); + } + return graph; + } + + @Override + protected SpeedTestSchema getSchema() { + if (null == schema) { + schema = SpeedTestSchema.get(); + } + return schema; + } +}
\ No newline at end of file diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphTest.java new file mode 100644 index 0000000..e9a9ac9 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftGraphTest.java @@ -0,0 +1,13 @@ +package com.thinkaurelius.titan.graphdb.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.CassandraGraphTest; + +public class ThriftGraphTest extends CassandraGraphTest { + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName()); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOLAPTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOLAPTest.java new file mode 100644 index 0000000..8679da9 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOLAPTest.java @@ -0,0 +1,20 @@ +package com.thinkaurelius.titan.graphdb.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.olap.OLAPTest; +import org.junit.BeforeClass; + +public class ThriftOLAPTest extends OLAPTest { + + @Override + public WriteConfiguration getConfiguration() { + return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName()); + } + + + @BeforeClass + public static void beforeClass() { + CassandraStorageSetup.startCleanEmbedded(); + } +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOperationCountingTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOperationCountingTest.java new file mode 100644 index 0000000..9bae61d --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftOperationCountingTest.java @@ -0,0 +1,25 @@ +package com.thinkaurelius.titan.graphdb.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanOperationCountingTest; +import org.junit.BeforeClass; + +public class ThriftOperationCountingTest extends TitanOperationCountingTest { + + @BeforeClass + public static void beforeClass() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public WriteConfiguration getBaseConfiguration() { + return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName()); + } + + @Override + public boolean storeUsesConsistentKeyLocker() { + return true; + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftPartitionGraphTest.java b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftPartitionGraphTest.java new file mode 100644 index 0000000..728e338 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/graphdb/thrift/ThriftPartitionGraphTest.java @@ -0,0 +1,21 @@ +package com.thinkaurelius.titan.graphdb.thrift; + +import com.thinkaurelius.titan.CassandraStorageSetup; +import com.thinkaurelius.titan.diskstorage.configuration.WriteConfiguration; +import com.thinkaurelius.titan.graphdb.TitanOperationCountingTest; +import com.thinkaurelius.titan.graphdb.TitanPartitionGraphTest; +import org.junit.BeforeClass; + +public class ThriftPartitionGraphTest extends TitanPartitionGraphTest { + + @BeforeClass + public static void beforeClass() { + CassandraStorageSetup.startCleanEmbedded(); + } + + @Override + public WriteConfiguration getBaseConfiguration() { + return CassandraStorageSetup.getCassandraThriftGraphConfiguration(getClass().getSimpleName()); + } + +} diff --git a/src/test/java/com/thinkaurelius/titan/testcategory/CassandraSSLTests.java b/src/test/java/com/thinkaurelius/titan/testcategory/CassandraSSLTests.java new file mode 100644 index 0000000..d1e3dd8 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/testcategory/CassandraSSLTests.java @@ -0,0 +1,10 @@ +package com.thinkaurelius.titan.testcategory; + +/** + * This is a JUnit category for tests that need to run against Cassandra + * configured for SSL-based client authentication. + * + * If you rename or move this class, then you must also update mentions of it in + * the Cassandra module's pom.xml. + */ +public interface CassandraSSLTests { } diff --git a/src/test/java/com/thinkaurelius/titan/testcategory/StandaloneTests.java b/src/test/java/com/thinkaurelius/titan/testcategory/StandaloneTests.java new file mode 100644 index 0000000..722c314 --- /dev/null +++ b/src/test/java/com/thinkaurelius/titan/testcategory/StandaloneTests.java @@ -0,0 +1,9 @@ +package com.thinkaurelius.titan.testcategory; + +/** + * This is a JUnit category for tests that don't need a Cassandra server. + * + * If you rename or move this class, then you must also update mentions of it in + * the Cassandra module's pom.xml. + */ +public class StandaloneTests { } diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties new file mode 100644 index 0000000..271e198 --- /dev/null +++ b/src/test/resources/log4j.properties @@ -0,0 +1,35 @@ +# A1 is a FileAppender. +log4j.appender.A1=org.apache.log4j.FileAppender +log4j.appender.A1.File=target/test.log +log4j.appender.A1.Threshold=TRACE +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{2}: %m%n + +# A2 is a ConsoleAppender. +log4j.appender.A2=org.apache.log4j.ConsoleAppender +log4j.appender.A2.Threshold=WARN +# A2 uses PatternLayout. +log4j.appender.A2.layout=org.apache.log4j.PatternLayout +log4j.appender.A2.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{2}: %m%n + +# Set both appenders (A1 and A2) on the root logger. +log4j.rootLogger=INFO, A1, A2 +#log4j.rootLogger=INFO, A1 + +# Restrict some of Titan's dependencies to INFO and scarier. +# These restrictions are useful when reducing the severity threshold +# setting on one of the appenders below INFO. +log4j.logger.org.apache.cassandra=INFO +log4j.logger.org.apache.hadoop=INFO +log4j.logger.org.apache.zookeeper=INFO +# Disable all messages from ExpectedValueCheckingTransaction. The point is to +# suppress scary-looking ERROR messages that are deliberately induced by +# LockKeyColumnValueStoreTest. +log4j.logger.com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction=OFF +# Lower dblog-related message thresholds +log4j.logger.com.thinkaurelius.titan.diskstorage.log.kcvs.KCVSLog=DEBUG +log4j.logger.com.thinkaurelius.titan.diskstorage.log.LogTest=DEBUG +log4j.logger.com.thinkaurelius.titan.diskstorage.log.KCVSLogTest=DEBUG +log4j.logger.com.thinkaurelius.titan.diskstorage.log.util.ProcessMessageJob=DEBUG +log4j.logger.com.thinkaurelius.titan.diskstorage.util.time.Timestamps=DEBUG diff --git a/src/test/resources/rexster-fragment.xml b/src/test/resources/rexster-fragment.xml new file mode 100644 index 0000000..19f88f9 --- /dev/null +++ b/src/test/resources/rexster-fragment.xml @@ -0,0 +1,13 @@ +<?xml version="1.0" encoding="UTF-8"?> +<graph> + <graph-read-only>false</graph-read-only> + <graph-location>home</graph-location> + <properties> + <storage.backend>local</storage.backend> + </properties> + <extensions> + <allows> + <allow>tp:gremlin</allow> + </allows> + </extensions> +</graph> |