summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java')
-rw-r--r--src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java536
1 files changed, 536 insertions, 0 deletions
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java
new file mode 100644
index 0000000..e44ab92
--- /dev/null
+++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/thrift/CassandraThriftKeyColumnValueStore.java
@@ -0,0 +1,536 @@
+package com.thinkaurelius.titan.diskstorage.cassandra.thrift;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.thinkaurelius.titan.diskstorage.*;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnection;
+import com.thinkaurelius.titan.diskstorage.cassandra.thrift.thriftpool.CTConnectionPool;
+import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*;
+import com.thinkaurelius.titan.diskstorage.util.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx;
+
+/**
+ * A Titan {@code KeyColumnValueStore} backed by Cassandra.
+ * This uses the Cassandra Thrift API.
+ *
+ * @author Dan LaRocque <dalaro@hopcount.org>
+ * @see CassandraThriftStoreManager
+ */
+public class CassandraThriftKeyColumnValueStore implements KeyColumnValueStore {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(CassandraThriftKeyColumnValueStore.class);
+
+ private static final Pattern BROKEN_BYTE_TOKEN_PATTERN = Pattern.compile("^Token\\(bytes\\[(.+)\\]\\)$");
+
+ // Cassandra access
+ private final CassandraThriftStoreManager storeManager;
+ private final String keyspace;
+ private final String columnFamily;
+ private final CTConnectionPool pool;
+ private final ThriftGetter entryGetter;
+
+ public CassandraThriftKeyColumnValueStore(String keyspace, String columnFamily, CassandraThriftStoreManager storeManager,
+ CTConnectionPool pool) {
+ this.storeManager = storeManager;
+ this.keyspace = keyspace;
+ this.columnFamily = columnFamily;
+ this.pool = pool;
+ this.entryGetter = new ThriftGetter(storeManager.getMetaDataSchema(columnFamily));
+ }
+
+ /**
+ * Call Cassandra's Thrift get_slice() method.
+ * <p/>
+ * When columnEnd equals columnStart and either startInclusive
+ * or endInclusive is false (or both are false), then this
+ * method returns an empty list without making any Thrift calls.
+ * <p/>
+ * If columnEnd = columnStart + 1, and both startInclusive and
+ * startExclusive are false, then the arguments effectively form
+ * an empty interval. In this case, as in the one previous,
+ * an empty list is returned. However, it may not necessarily
+ * be handled efficiently; a Thrift call might still be made
+ * before returning the empty list.
+ *
+ * @throws com.thinkaurelius.titan.diskstorage.BackendException
+ * when columnEnd < columnStart
+ */
+ @Override
+ public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException {
+ Map<StaticBuffer, EntryList> result = getNamesSlice(query.getKey(), query, txh);
+ return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST);
+ }
+
+ @Override
+ public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException {
+ return getNamesSlice(keys, query, txh);
+ }
+
+ public Map<StaticBuffer, EntryList> getNamesSlice(StaticBuffer key,
+ SliceQuery query, StoreTransaction txh) throws BackendException {
+ return getNamesSlice(ImmutableList.of(key),query,txh);
+ }
+
+ public Map<StaticBuffer, EntryList> getNamesSlice(List<StaticBuffer> keys,
+ SliceQuery query,
+ StoreTransaction txh) throws BackendException {
+ ColumnParent parent = new ColumnParent(columnFamily);
+ /*
+ * Cassandra cannot handle columnStart = columnEnd.
+ * Cassandra's Thrift getSlice() throws InvalidRequestException
+ * if columnStart = columnEnd.
+ */
+ if (query.getSliceStart().compareTo(query.getSliceEnd()) >= 0) {
+ // Check for invalid arguments where columnEnd < columnStart
+ if (query.getSliceEnd().compareTo(query.getSliceStart())<0) {
+ throw new PermanentBackendException("columnStart=" + query.getSliceStart() +
+ " is greater than columnEnd=" + query.getSliceEnd() + ". " +
+ "columnStart must be less than or equal to columnEnd");
+ }
+ if (0 != query.getSliceStart().length() && 0 != query.getSliceEnd().length()) {
+ logger.debug("Return empty list due to columnEnd==columnStart and neither empty");
+ return KCVSUtil.emptyResults(keys);
+ }
+ }
+
+ assert query.getSliceStart().compareTo(query.getSliceEnd()) < 0;
+ ConsistencyLevel consistency = getTx(txh).getReadConsistencyLevel().getThrift();
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange range = new SliceRange();
+ range.setCount(query.getLimit() + (query.hasLimit()?1:0)); //Add one for potentially removed last column
+ range.setStart(query.getSliceStart().asByteBuffer());
+ range.setFinish(query.getSliceEnd().asByteBuffer());
+ predicate.setSlice_range(range);
+
+ CTConnection conn = null;
+ try {
+ conn = pool.borrowObject(keyspace);
+ Cassandra.Client client = conn.getClient();
+ Map<ByteBuffer, List<ColumnOrSuperColumn>> rows = client.multiget_slice(CassandraHelper.convert(keys),
+ parent,
+ predicate,
+ consistency);
+
+ /*
+ * The final size of the "result" List may be at most rows.size().
+ * However, "result" could also be up to two elements smaller than
+ * rows.size(), depending on startInclusive and endInclusive
+ */
+ Map<StaticBuffer, EntryList> results = new HashMap<StaticBuffer, EntryList>();
+
+ for (ByteBuffer key : rows.keySet()) {
+ results.put(StaticArrayBuffer.of(key),
+ CassandraHelper.makeEntryList(rows.get(key), entryGetter, query.getSliceEnd(), query.getLimit()));
+ }
+
+ return results;
+ } catch (Exception e) {
+ throw convertException(e);
+ } finally {
+ pool.returnObjectUnsafe(keyspace, conn);
+ }
+ }
+
+ private static class ThriftGetter implements StaticArrayEntry.GetColVal<ColumnOrSuperColumn,ByteBuffer> {
+
+ private final EntryMetaData[] schema;
+
+ private ThriftGetter(EntryMetaData[] schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public ByteBuffer getColumn(ColumnOrSuperColumn element) {
+ return element.getColumn().bufferForName();
+ }
+
+ @Override
+ public ByteBuffer getValue(ColumnOrSuperColumn element) {
+ return element.getColumn().bufferForValue();
+ }
+
+ @Override
+ public EntryMetaData[] getMetaSchema(ColumnOrSuperColumn element) {
+ return schema;
+ }
+
+ @Override
+ public Object getMetaData(ColumnOrSuperColumn element, EntryMetaData meta) {
+ switch(meta) {
+ case TIMESTAMP:
+ return element.getColumn().getTimestamp();
+ case TTL:
+ return element.getColumn().getTtl();
+ default:
+ throw new UnsupportedOperationException("Unsupported meta data: " + meta);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ // Do nothing
+ }
+
+ @Override
+ public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue,
+ StoreTransaction txh) throws BackendException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public KeyIterator getKeys(@Nullable SliceQuery sliceQuery, StoreTransaction txh) throws BackendException {
+ final IPartitioner partitioner = storeManager.getCassandraPartitioner();
+
+ if (!(partitioner instanceof RandomPartitioner) && !(partitioner instanceof Murmur3Partitioner))
+ throw new PermanentBackendException("This operation is only allowed when random partitioner (md5 or murmur3) is used.");
+
+ try {
+ return new AllTokensIterator(partitioner, sliceQuery, storeManager.getPageSize());
+ } catch (Exception e) {
+ throw convertException(e);
+ }
+ }
+
+ @Override
+ public KeyIterator getKeys(KeyRangeQuery keyRangeQuery, StoreTransaction txh) throws BackendException {
+ final IPartitioner partitioner = storeManager.getCassandraPartitioner();
+
+ // see rant about the reason of this limitation in Astyanax implementation of this method.
+ if (!(partitioner instanceof AbstractByteOrderedPartitioner))
+ throw new PermanentBackendException("This operation is only allowed when byte-ordered partitioner is used.");
+
+ try {
+ return new KeyRangeIterator(partitioner, keyRangeQuery, storeManager.getPageSize(),
+ keyRangeQuery.getKeyStart().asByteBuffer(),
+ keyRangeQuery.getKeyEnd().asByteBuffer());
+ } catch (Exception e) {
+ throw convertException(e);
+ }
+ }
+
+ @Override
+ public String getName() {
+ return columnFamily;
+ }
+
+ @Override
+ public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
+ Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions));
+ mutateMany(mutations, txh);
+ }
+
+ public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException {
+ storeManager.mutateMany(ImmutableMap.of(columnFamily, mutations), txh);
+ }
+
+ static BackendException convertException(Throwable e) {
+ if (e instanceof TException) {
+ return new PermanentBackendException(e);
+ } else if (e instanceof TimedOutException) {
+ return new TemporaryBackendException(e);
+ } else if (e instanceof UnavailableException) {
+ return new TemporaryBackendException(e);
+ } else if (e instanceof InvalidRequestException) {
+ return new PermanentBackendException(e);
+ } else {
+ return new PermanentBackendException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "CassandraThriftKeyColumnValueStore[ks="
+ + keyspace + ", cf=" + columnFamily + "]";
+ }
+
+
+ private List<KeySlice> getKeySlice(ByteBuffer startKey,
+ ByteBuffer endKey,
+ SliceQuery columnSlice,
+ int count) throws BackendException {
+ return getRangeSlices(new org.apache.cassandra.thrift.KeyRange().setStart_key(startKey).setEnd_key(endKey).setCount(count), columnSlice);
+ }
+
+ private <T extends Token> List<KeySlice> getTokenSlice(T startToken, T endToken,
+ SliceQuery sliceQuery, int count) throws BackendException {
+
+ String st = sanitizeBrokenByteToken(startToken);
+ String et = sanitizeBrokenByteToken(endToken);
+
+ org.apache.cassandra.thrift.KeyRange kr = new org.apache.cassandra.thrift.KeyRange().setStart_token(st).setEnd_token(et).setCount(count);
+
+ return getRangeSlices(kr, sliceQuery);
+ }
+
+ private String sanitizeBrokenByteToken(Token tok) {
+ /*
+ * Background: https://issues.apache.org/jira/browse/CASSANDRA-5566
+ *
+ * This check is useful for compatibility with Cassandra server versions
+ * 1.2.4 and earlier.
+ */
+ String st = tok.toString();
+ if (!(tok instanceof BytesToken))
+ return st;
+
+ // Do a cheap 1-character startsWith before unleashing the regex
+ if (st.startsWith("T")) {
+ Matcher m = BROKEN_BYTE_TOKEN_PATTERN.matcher(st);
+ if (!m.matches()) {
+ logger.warn("Unknown token string format: \"{}\"", st);
+ } else {
+ String old = st;
+ st = m.group(1);
+ logger.debug("Rewrote token string: \"{}\" -> \"{}\"", old, st);
+ }
+ }
+ return st;
+ }
+
+ private List<KeySlice> getRangeSlices(org.apache.cassandra.thrift.KeyRange keyRange, @Nullable SliceQuery sliceQuery) throws BackendException {
+ SliceRange sliceRange = new SliceRange();
+
+ if (sliceQuery == null) {
+ sliceRange.setStart(ArrayUtils.EMPTY_BYTE_ARRAY)
+ .setFinish(ArrayUtils.EMPTY_BYTE_ARRAY)
+ .setCount(5);
+ } else {
+ sliceRange.setStart(sliceQuery.getSliceStart().asByteBuffer())
+ .setFinish(sliceQuery.getSliceEnd().asByteBuffer())
+ .setCount((sliceQuery.hasLimit()) ? sliceQuery.getLimit() : Integer.MAX_VALUE);
+ }
+
+
+ CTConnection connection = null;
+ try {
+ connection = pool.borrowObject(keyspace);
+
+ List<KeySlice> slices =
+ connection.getClient().get_range_slices(new ColumnParent(columnFamily),
+ new SlicePredicate()
+ .setSlice_range(sliceRange),
+ keyRange,
+ ConsistencyLevel.QUORUM);
+
+ for (KeySlice s : slices) {
+ logger.debug("Key {}", ByteBufferUtil.toString(s.key, "-"));
+ }
+
+ /* Note: we need to fetch columns for each row as well to remove "range ghosts" */
+ List<KeySlice> result = new ArrayList<>(slices.size());
+ KeyIterationPredicate pred = new KeyIterationPredicate();
+ for (KeySlice ks : slices)
+ if (pred.apply(ks))
+ result.add(ks);
+ return result;
+ } catch (Exception e) {
+ throw convertException(e);
+ } finally {
+ if (connection != null)
+ pool.returnObjectUnsafe(keyspace, connection);
+ }
+ }
+
+ private static class KeyIterationPredicate implements Predicate<KeySlice> {
+
+ @Override
+ public boolean apply(@Nullable KeySlice row) {
+ return (row != null) && row.getColumns().size() > 0;
+ }
+ }
+
+ /**
+ * Slices rows and columns using tokens. Recall that the partitioner turns
+ * keys into tokens. For instance, under RandomPartitioner, tokens are the
+ * MD5 hashes of keys.
+ */
+ public class AbstractBufferedRowIter implements KeyIterator {
+
+ private final int pageSize;
+ private final SliceQuery columnSlice;
+
+ private boolean isClosed;
+ private boolean seenEnd;
+ protected Iterator<KeySlice> ksIter;
+ private KeySlice mostRecentRow;
+
+ private final IPartitioner partitioner;
+ private Token nextStartToken;
+ private final Token endToken;
+ private ByteBuffer nextStartKey;
+
+ private boolean omitEndToken;
+
+ public AbstractBufferedRowIter(IPartitioner partitioner,
+ SliceQuery columnSlice, int pageSize, Token startToken, Token endToken, boolean omitEndToken) {
+ this.pageSize = pageSize;
+ this.partitioner = partitioner;
+ this.nextStartToken = startToken;
+ this.endToken = endToken;
+ this.columnSlice = columnSlice;
+
+ this.seenEnd = false;
+ this.isClosed = false;
+ this.ksIter = Iterators.emptyIterator();
+ this.mostRecentRow = null;
+ this.omitEndToken = omitEndToken;
+ }
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+
+ if (!ksIter.hasNext() && !seenEnd) {
+ try {
+ ksIter = rebuffer().iterator();
+ } catch (BackendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return ksIter.hasNext();
+ }
+
+ @Override
+ public StaticBuffer next() {
+ ensureOpen();
+
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ mostRecentRow = ksIter.next();
+
+ Preconditions.checkNotNull(mostRecentRow);
+ return StaticArrayBuffer.of(mostRecentRow.bufferForKey());
+ }
+
+ @Override
+ public void close() {
+ closeIterator();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RecordIterator<Entry> getEntries() {
+ ensureOpen();
+
+ return new RecordIterator<Entry>() {
+ final Iterator<Entry> columns =
+ CassandraHelper.makeEntryIterator(mostRecentRow.getColumns(),
+ entryGetter, columnSlice.getSliceEnd(),
+ columnSlice.getLimit());
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+ return columns.hasNext();
+ }
+
+ @Override
+ public Entry next() {
+ ensureOpen();
+ return columns.next();
+ }
+
+ @Override
+ public void close() {
+ closeIterator();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private void ensureOpen() {
+ if (isClosed)
+ throw new IllegalStateException("Iterator has been closed.");
+ }
+
+ private void closeIterator() {
+ if (!isClosed) {
+ isClosed = true;
+ }
+ }
+
+ private List<KeySlice> rebuffer() throws BackendException {
+
+ Preconditions.checkArgument(!seenEnd);
+
+ return checkFreshSlices(getNextKeySlices());
+ }
+
+ protected List<KeySlice> checkFreshSlices(List<KeySlice> ks) {
+
+ if (0 == ks.size()) {
+ seenEnd = true;
+ return Collections.emptyList();
+ }
+
+ nextStartKey = ks.get(ks.size() - 1).bufferForKey();
+ nextStartToken = partitioner.getToken(nextStartKey);
+
+ if (nextStartToken.equals(endToken)) {
+ seenEnd = true;
+ if (omitEndToken)
+ ks.remove(ks.size() - 1);
+ }
+
+ return ks;
+ }
+
+ protected final List<KeySlice> getNextKeySlices() throws BackendException {
+ return getTokenSlice(nextStartToken, endToken, columnSlice, pageSize);
+ }
+ }
+
+ private final class AllTokensIterator extends AbstractBufferedRowIter {
+ public AllTokensIterator(IPartitioner partitioner, SliceQuery columnSlice, int pageSize) {
+ super(partitioner, columnSlice, pageSize, partitioner.getMinimumToken(), partitioner.getMinimumToken(), false);
+ }
+ }
+
+ private final class KeyRangeIterator extends AbstractBufferedRowIter {
+ public KeyRangeIterator(IPartitioner partitioner, SliceQuery columnSlice,
+ int pageSize, ByteBuffer startKey, ByteBuffer endKey) throws BackendException {
+ super(partitioner, columnSlice, pageSize, partitioner.getToken(startKey), partitioner.getToken(endKey), true);
+
+ Preconditions.checkArgument(partitioner instanceof AbstractByteOrderedPartitioner);
+
+ // Get first slice with key range instead of token range. Token
+ // ranges are start-exclusive, key ranges are start-inclusive. Both
+ // are end-inclusive. If we don't make the call below, then we will
+ // erroneously miss startKey.
+ List<KeySlice> ks = getKeySlice(startKey, endKey, columnSlice, pageSize);
+
+ this.ksIter = checkFreshSlices(ks).iterator();
+ }
+ }
+}