diff options
Diffstat (limited to 'src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java')
-rw-r--r-- | src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java | 297 |
1 files changed, 297 insertions, 0 deletions
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java new file mode 100644 index 0000000..1e12ada --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/AbstractCassandraStoreManager.java @@ -0,0 +1,297 @@ +package com.thinkaurelius.titan.diskstorage.cassandra; + +import java.util.*; + +import com.google.common.collect.ImmutableMap; +import com.thinkaurelius.titan.core.TitanException; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; + +import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.*; + +import org.apache.cassandra.dht.IPartitioner; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Matthias Broecheler (me@matthiasb.com) + */ +@PreInitializeConfigOptions +public abstract class AbstractCassandraStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager { + + public enum Partitioner { + + RANDOM, BYTEORDER; + + public static Partitioner getPartitioner(IPartitioner partitioner) { + return getPartitioner(partitioner.getClass().getSimpleName()); + } + + public static Partitioner getPartitioner(String className) { + if (className.endsWith("RandomPartitioner") || className.endsWith("Murmur3Partitioner")) + return Partitioner.RANDOM; + else if (className.endsWith("ByteOrderedPartitioner")) return Partitioner.BYTEORDER; + else throw new IllegalArgumentException("Unsupported partitioner: " + className); + } + } + + //################### CASSANDRA SPECIFIC CONFIGURATION OPTIONS ###################### + + public static final ConfigNamespace CASSANDRA_NS = + new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "cassandra", "Cassandra storage backend options"); + + public static final ConfigOption<String> CASSANDRA_KEYSPACE = + new ConfigOption<String>(CASSANDRA_NS, "keyspace", + "The name of Titan's keyspace. It will be created if it does not exist.", + ConfigOption.Type.LOCAL, "titan"); + + // Consistency Levels and Atomic Batch + public static final ConfigOption<String> CASSANDRA_READ_CONSISTENCY = + new ConfigOption<String>(CASSANDRA_NS, "read-consistency-level", + "The consistency level of read operations against Cassandra", + ConfigOption.Type.MASKABLE, "LOCAL_QUORUM"); + + public static final ConfigOption<String> CASSANDRA_WRITE_CONSISTENCY = + new ConfigOption<String>(CASSANDRA_NS, "write-consistency-level", + "The consistency level of write operations against Cassandra", + ConfigOption.Type.MASKABLE, "LOCAL_QUORUM"); + + public static final ConfigOption<Boolean> ATOMIC_BATCH_MUTATE = + new ConfigOption<Boolean>(CASSANDRA_NS, "atomic-batch-mutate", + "True to use Cassandra atomic batch mutation, false to use non-atomic batches", + ConfigOption.Type.MASKABLE, true); + + // Replication + public static final ConfigOption<Integer> REPLICATION_FACTOR = + new ConfigOption<Integer>(CASSANDRA_NS, "replication-factor", + "The number of data replicas (including the original copy) that should be kept. " + + "This is only meaningful for storage backends that natively support data replication.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + public static final ConfigOption<String> REPLICATION_STRATEGY = + new ConfigOption<String>(CASSANDRA_NS, "replication-strategy-class", + "The replication strategy to use for Titan keyspace", + ConfigOption.Type.FIXED, "org.apache.cassandra.locator.SimpleStrategy"); + + public static final ConfigOption<String[]> REPLICATION_OPTIONS = + new ConfigOption<String[]>(CASSANDRA_NS, "replication-strategy-options", + "Replication strategy options, e.g. factor or replicas per datacenter. This list is interpreted as a " + + "map. It must have an even number of elements in [key,val,key,val,...] form. A replication_factor set " + + "here takes precedence over one set with " + ConfigElement.getPath(REPLICATION_FACTOR), + ConfigOption.Type.FIXED, String[].class); + + // Compression + public static final ConfigOption<Boolean> CF_COMPRESSION = + new ConfigOption<Boolean>(CASSANDRA_NS, "compression", + "Whether the storage backend should use compression when storing the data", ConfigOption.Type.FIXED, true); + + public static final ConfigOption<String> CF_COMPRESSION_TYPE = + new ConfigOption<String>(CASSANDRA_NS, "compression-type", + "The sstable_compression value Titan uses when creating column families. " + + "This accepts any value allowed by Cassandra's sstable_compression option. " + + "Leave this unset to disable sstable_compression on Titan-created CFs.", + ConfigOption.Type.MASKABLE, "LZ4Compressor"); + + public static final ConfigOption<Integer> CF_COMPRESSION_BLOCK_SIZE = + new ConfigOption<Integer>(CASSANDRA_NS, "compression-block-size", + "The size of the compression blocks in kilobytes", ConfigOption.Type.FIXED, 64); + + // SSL + public static final ConfigNamespace SSL_NS = + new ConfigNamespace(CASSANDRA_NS, "ssl", "Configuration options for SSL"); + + public static final ConfigNamespace SSL_TRUSTSTORE_NS = + new ConfigNamespace(SSL_NS, "truststore", "Configuration options for SSL Truststore."); + + public static final ConfigOption<Boolean> SSL_ENABLED = + new ConfigOption<Boolean>(SSL_NS, "enabled", + "Controls use of the SSL connection to Cassandra", ConfigOption.Type.LOCAL, false); + + public static final ConfigOption<String> SSL_TRUSTSTORE_LOCATION = + new ConfigOption<String>(SSL_TRUSTSTORE_NS, "location", + "Marks the location of the SSL Truststore.", ConfigOption.Type.LOCAL, ""); + + public static final ConfigOption<String> SSL_TRUSTSTORE_PASSWORD = + new ConfigOption<String>(SSL_TRUSTSTORE_NS, "password", + "The password to access SSL Truststore.", ConfigOption.Type.LOCAL, ""); + + // Thrift transport + public static final ConfigOption<Integer> THRIFT_FRAME_SIZE_MB = + new ConfigOption<>(CASSANDRA_NS, "frame-size-mb", + "The thrift frame size in megabytes", ConfigOption.Type.MASKABLE, 15); + + /** + * The default Thrift port used by Cassandra. Set + * {@link GraphDatabaseConfiguration#STORAGE_PORT} to override. + * <p> + * Value = {@value} + */ + public static final int PORT_DEFAULT = 9160; + + public static final String SYSTEM_KS = "system"; + + protected final String keySpaceName; + protected final Map<String, String> strategyOptions; + + protected final boolean compressionEnabled; + protected final int compressionChunkSizeKB; + protected final String compressionClass; + + protected final boolean atomicBatch; + + protected final int thriftFrameSizeBytes; + + private volatile StoreFeatures features = null; + private Partitioner partitioner = null; + + private static final Logger log = + LoggerFactory.getLogger(AbstractCassandraStoreManager.class); + + public AbstractCassandraStoreManager(Configuration config) { + super(config, PORT_DEFAULT); + + this.keySpaceName = config.get(CASSANDRA_KEYSPACE); + this.compressionEnabled = config.get(CF_COMPRESSION); + this.compressionChunkSizeKB = config.get(CF_COMPRESSION_BLOCK_SIZE); + this.compressionClass = config.get(CF_COMPRESSION_TYPE); + this.atomicBatch = config.get(ATOMIC_BATCH_MUTATE); + this.thriftFrameSizeBytes = config.get(THRIFT_FRAME_SIZE_MB) * 1024 * 1024; + + // SSL truststore location sanity check + if (config.get(SSL_ENABLED) && config.get(SSL_TRUSTSTORE_LOCATION).isEmpty()) + throw new IllegalArgumentException(SSL_TRUSTSTORE_LOCATION.getName() + " could not be empty when SSL is enabled."); + + if (config.has(REPLICATION_OPTIONS)) { + String[] options = config.get(REPLICATION_OPTIONS); + + if (options.length % 2 != 0) + throw new IllegalArgumentException(REPLICATION_OPTIONS.getName() + " should have even number of elements."); + + Map<String, String> converted = new HashMap<String, String>(options.length / 2); + + for (int i = 0; i < options.length; i += 2) { + converted.put(options[i], options[i + 1]); + } + + this.strategyOptions = ImmutableMap.copyOf(converted); + } else { + this.strategyOptions = ImmutableMap.of("replication_factor", String.valueOf(config.get(REPLICATION_FACTOR))); + } + } + + public final Partitioner getPartitioner() { + if (partitioner == null) { + try { + partitioner = Partitioner.getPartitioner(getCassandraPartitioner()); + } catch (BackendException e) { + throw new TitanException("Could not connect to Cassandra to read partitioner information. Please check the connection", e); + } + } + assert partitioner != null; + return partitioner; + } + + public abstract IPartitioner getCassandraPartitioner() throws BackendException; + + @Override + public StoreTransaction beginTransaction(final BaseTransactionConfig config) { + return new CassandraTransaction(config); + } + + @Override + public String toString() { + return "[" + keySpaceName + "@" + super.toString() + "]"; + } + + @Override + public StoreFeatures getFeatures() { + + if (features == null) { + + Configuration global = GraphDatabaseConfiguration.buildGraphConfiguration() + .set(CASSANDRA_READ_CONSISTENCY, "LOCAL_QUORUM") + .set(CASSANDRA_WRITE_CONSISTENCY, "LOCAL_QUORUM") + .set(METRICS_PREFIX, GraphDatabaseConfiguration.METRICS_SYSTEM_PREFIX_DEFAULT); + + Configuration local = GraphDatabaseConfiguration.buildGraphConfiguration() + .set(CASSANDRA_READ_CONSISTENCY, "LOCAL_QUORUM") + .set(CASSANDRA_WRITE_CONSISTENCY, "LOCAL_QUORUM") + .set(METRICS_PREFIX, GraphDatabaseConfiguration.METRICS_SYSTEM_PREFIX_DEFAULT); + + StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder(); + + fb.batchMutation(true).distributed(true); + fb.timestamps(true).cellTTL(true); + fb.keyConsistent(global, local); + + boolean keyOrdered; + + switch (getPartitioner()) { + case RANDOM: + keyOrdered = false; + fb.keyOrdered(keyOrdered).orderedScan(false).unorderedScan(true); + break; + + case BYTEORDER: + keyOrdered = true; + fb.keyOrdered(keyOrdered).orderedScan(true).unorderedScan(false); + break; + + default: + throw new IllegalArgumentException("Unrecognized partitioner: " + getPartitioner()); + } + + switch (getDeployment()) { + case REMOTE: + fb.multiQuery(true); + break; + + case LOCAL: + fb.multiQuery(true).localKeyPartition(keyOrdered); + break; + + case EMBEDDED: + fb.multiQuery(false).localKeyPartition(keyOrdered); + break; + + default: + throw new IllegalArgumentException("Unrecognized deployment mode: " + getDeployment()); + } + + features = fb.build(); + } + + return features; + } + + /** + * Returns a map of compression options for the column family {@code cf}. + * The contents of the returned map must be identical to the contents of the + * map returned by + * {@link org.apache.cassandra.thrift.CfDef#getCompression_options()}, even + * for implementations of this method that don't use Thrift. + * + * @param cf the name of the column family for which to return compression + * options + * @return map of compression option names to compression option values + * @throws com.thinkaurelius.titan.diskstorage.BackendException if reading from Cassandra fails + */ + public abstract Map<String, String> getCompressionOptions(String cf) throws BackendException; + + public String getName() { + return getClass().getSimpleName() + keySpaceName; + } + +} |