diff options
Diffstat (limited to 'src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java')
-rw-r--r-- | src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java | 358 |
1 files changed, 358 insertions, 0 deletions
diff --git a/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java new file mode 100644 index 0000000..0d3eb9b --- /dev/null +++ b/src/main/java/com/thinkaurelius/titan/diskstorage/cassandra/astyanax/AstyanaxKeyColumnValueStore.java @@ -0,0 +1,358 @@ +package com.thinkaurelius.titan.diskstorage.cassandra.astyanax; + +import com.google.common.base.Predicate; +import com.google.common.collect.*; +import com.netflix.astyanax.ExceptionCallback; +import com.netflix.astyanax.Keyspace; +import com.netflix.astyanax.connectionpool.OperationResult; +import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; +import com.netflix.astyanax.model.*; +import com.netflix.astyanax.query.AllRowsQuery; +import com.netflix.astyanax.query.RowSliceQuery; +import com.netflix.astyanax.retry.RetryPolicy; +import com.netflix.astyanax.serializers.ByteBufferSerializer; +import com.thinkaurelius.titan.diskstorage.*; +import com.thinkaurelius.titan.diskstorage.cassandra.utils.CassandraHelper; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; +import com.thinkaurelius.titan.diskstorage.util.RecordIterator; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.*; + +import static com.thinkaurelius.titan.diskstorage.cassandra.AbstractCassandraStoreManager.Partitioner; +import static com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction.getTx; + +public class AstyanaxKeyColumnValueStore implements KeyColumnValueStore { + + private final Keyspace keyspace; + private final String columnFamilyName; + private final ColumnFamily<ByteBuffer, ByteBuffer> columnFamily; + private final RetryPolicy retryPolicy; + private final AstyanaxStoreManager storeManager; + private final AstyanaxGetter entryGetter; + + AstyanaxKeyColumnValueStore(String columnFamilyName, + Keyspace keyspace, + AstyanaxStoreManager storeManager, + RetryPolicy retryPolicy) { + this.keyspace = keyspace; + this.columnFamilyName = columnFamilyName; + this.retryPolicy = retryPolicy; + this.storeManager = storeManager; + + entryGetter = new AstyanaxGetter(storeManager.getMetaDataSchema(columnFamilyName)); + + columnFamily = new ColumnFamily<ByteBuffer, ByteBuffer>( + this.columnFamilyName, + ByteBufferSerializer.get(), + ByteBufferSerializer.get()); + + } + + + ColumnFamily<ByteBuffer, ByteBuffer> getColumnFamily() { + return columnFamily; + } + + @Override + public void close() throws BackendException { + //Do nothing + } + + @Override + public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException { + Map<StaticBuffer, EntryList> result = getNamesSlice(query.getKey(), query, txh); + return Iterables.getOnlyElement(result.values(),EntryList.EMPTY_LIST); + } + + @Override + public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException { + return getNamesSlice(keys, query, txh); + } + + public Map<StaticBuffer, EntryList> getNamesSlice(StaticBuffer key, + SliceQuery query, StoreTransaction txh) throws BackendException { + return getNamesSlice(ImmutableList.of(key),query,txh); + } + + + public Map<StaticBuffer, EntryList> getNamesSlice(List<StaticBuffer> keys, + SliceQuery query, StoreTransaction txh) throws BackendException { + /* + * RowQuery<K,C> should be parameterized as + * RowQuery<ByteBuffer,ByteBuffer>. However, this causes the following + * compilation error when attempting to call withColumnRange on a + * RowQuery<ByteBuffer,ByteBuffer> instance: + * + * java.lang.Error: Unresolved compilation problem: The method + * withColumnRange(ByteBuffer, ByteBuffer, boolean, int) is ambiguous + * for the type RowQuery<ByteBuffer,ByteBuffer> + * + * The compiler substitutes ByteBuffer=C for both startColumn and + * endColumn, compares it to its identical twin with that type + * hard-coded, and dies. + * + */ + RowSliceQuery rq = keyspace.prepareQuery(columnFamily) + .setConsistencyLevel(getTx(txh).getReadConsistencyLevel().getAstyanax()) + .withRetryPolicy(retryPolicy.duplicate()) + .getKeySlice(CassandraHelper.convert(keys)); + + // Thank you, Astyanax, for making builder pattern useful :( + rq.withColumnRange(query.getSliceStart().asByteBuffer(), + query.getSliceEnd().asByteBuffer(), + false, + query.getLimit() + (query.hasLimit()?1:0)); //Add one for potentially removed last column + + OperationResult<Rows<ByteBuffer, ByteBuffer>> r; + try { + r = (OperationResult<Rows<ByteBuffer, ByteBuffer>>) rq.execute(); + } catch (ConnectionException e) { + throw new TemporaryBackendException(e); + } + + Rows<ByteBuffer,ByteBuffer> rows = r.getResult(); + Map<StaticBuffer, EntryList> result = new HashMap<StaticBuffer, EntryList>(rows.size()); + + for (Row<ByteBuffer, ByteBuffer> row : rows) { + assert !result.containsKey(row.getKey()); + result.put(StaticArrayBuffer.of(row.getKey()), + CassandraHelper.makeEntryList(row.getColumns(),entryGetter, query.getSliceEnd(), query.getLimit())); + } + + return result; + } + + private static class AstyanaxGetter implements StaticArrayEntry.GetColVal<Column<ByteBuffer>,ByteBuffer> { + + private final EntryMetaData[] schema; + + private AstyanaxGetter(EntryMetaData[] schema) { + this.schema = schema; + } + + + @Override + public ByteBuffer getColumn(Column<ByteBuffer> element) { + return element.getName(); + } + + @Override + public ByteBuffer getValue(Column<ByteBuffer> element) { + return element.getByteBufferValue(); + } + + @Override + public EntryMetaData[] getMetaSchema(Column<ByteBuffer> element) { + return schema; + } + + @Override + public Object getMetaData(Column<ByteBuffer> element, EntryMetaData meta) { + switch(meta) { + case TIMESTAMP: + return element.getTimestamp(); + case TTL: + return element.getTtl(); + default: + throw new UnsupportedOperationException("Unsupported meta data: " + meta); + } + } + } + + @Override + public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException { + mutateMany(ImmutableMap.of(key, new KCVMutation(additions, deletions)), txh); + } + + public void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException { + storeManager.mutateMany(ImmutableMap.of(columnFamilyName, mutations), txh); + } + + @Override + public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException { + throw new UnsupportedOperationException(); + } + + @Override + public KeyIterator getKeys(@Nullable SliceQuery sliceQuery, StoreTransaction txh) throws BackendException { + if (storeManager.getPartitioner() != Partitioner.RANDOM) + throw new PermanentBackendException("This operation is only allowed when random partitioner (md5 or murmur3) is used."); + + AllRowsQuery allRowsQuery = keyspace.prepareQuery(columnFamily).getAllRows(); + + if (sliceQuery != null) { + allRowsQuery.withColumnRange(sliceQuery.getSliceStart().asByteBuffer(), + sliceQuery.getSliceEnd().asByteBuffer(), + false, + sliceQuery.getLimit()); + } + + Rows<ByteBuffer, ByteBuffer> result; + try { + /* Note: we need to fetch columns for each row as well to remove "range ghosts" */ + OperationResult op = allRowsQuery.setRowLimit(storeManager.getPageSize()) // pre-fetch that many rows at a time + .setConcurrencyLevel(1) // one execution thread for fetching portion of rows + .setExceptionCallback(new ExceptionCallback() { + private int retries = 0; + + @Override + public boolean onException(ConnectionException e) { + try { + return retries > 2; // make 3 re-tries + } finally { + retries++; + } + } + }).execute(); + + result = ((OperationResult<Rows<ByteBuffer, ByteBuffer>>) op).getResult(); + } catch (ConnectionException e) { + throw new PermanentBackendException(e); + } + + return new RowIterator(result.iterator(), sliceQuery); + } + + @Override + public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException { + // this query could only be done when byte-ordering partitioner is used + // because Cassandra operates on tokens internally which means that even contiguous + // range of keys (e.g. time slice) with random partitioner could produce disjoint set of tokens + // returning ambiguous results to the user. + Partitioner partitioner = storeManager.getPartitioner(); + if (partitioner != Partitioner.BYTEORDER) + throw new PermanentBackendException("getKeys(KeyRangeQuery could only be used with byte-ordering partitioner."); + + ByteBuffer start = query.getKeyStart().asByteBuffer(), end = query.getKeyEnd().asByteBuffer(); + + RowSliceQuery rowSlice = keyspace.prepareQuery(columnFamily) + .setConsistencyLevel(getTx(txh).getReadConsistencyLevel().getAstyanax()) + .withRetryPolicy(retryPolicy.duplicate()) + .getKeyRange(start, end, null, null, Integer.MAX_VALUE); + + // Astyanax is bad at builder pattern :( + rowSlice.withColumnRange(query.getSliceStart().asByteBuffer(), + query.getSliceEnd().asByteBuffer(), + false, + query.getLimit()); + + // Omit final the query's keyend from the result, if present in result + final Rows<ByteBuffer, ByteBuffer> r; + try { + r = ((OperationResult<Rows<ByteBuffer, ByteBuffer>>) rowSlice.execute()).getResult(); + } catch (ConnectionException e) { + throw new TemporaryBackendException(e); + } + Iterator<Row<ByteBuffer, ByteBuffer>> i = + Iterators.filter(r.iterator(), new KeySkipPredicate(query.getKeyEnd().asByteBuffer())); + return new RowIterator(i, query); + } + + @Override + public String getName() { + return columnFamilyName; + } + + private static class KeyIterationPredicate implements Predicate<Row<ByteBuffer, ByteBuffer>> { + @Override + public boolean apply(@Nullable Row<ByteBuffer, ByteBuffer> row) { + return (row != null) && row.getColumns().size() > 0; + } + } + + private static class KeySkipPredicate implements Predicate<Row<ByteBuffer, ByteBuffer>> { + + private final ByteBuffer skip; + + public KeySkipPredicate(ByteBuffer skip) { + this.skip = skip; + } + + @Override + public boolean apply(@Nullable Row<ByteBuffer, ByteBuffer> row) { + return (row != null) && !row.getKey().equals(skip); + } + } + + private class RowIterator implements KeyIterator { + private final Iterator<Row<ByteBuffer, ByteBuffer>> rows; + private Row<ByteBuffer, ByteBuffer> currentRow; + private final SliceQuery sliceQuery; + private boolean isClosed; + + public RowIterator(Iterator<Row<ByteBuffer, ByteBuffer>> rowIter, SliceQuery sliceQuery) { + this.rows = Iterators.filter(rowIter, new KeyIterationPredicate()); + this.sliceQuery = sliceQuery; + } + + @Override + public RecordIterator<Entry> getEntries() { + ensureOpen(); + + if (sliceQuery == null) + throw new IllegalStateException("getEntries() requires SliceQuery to be set."); + + return new RecordIterator<Entry>() { + private final Iterator<Entry> columns = + CassandraHelper.makeEntryIterator(currentRow.getColumns(), + entryGetter, + sliceQuery.getSliceEnd(),sliceQuery.getLimit()); + + @Override + public boolean hasNext() { + ensureOpen(); + return columns.hasNext(); + } + + @Override + public Entry next() { + ensureOpen(); + return columns.next(); + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public boolean hasNext() { + ensureOpen(); + return rows.hasNext(); + } + + @Override + public StaticBuffer next() { + ensureOpen(); + + currentRow = rows.next(); + return StaticArrayBuffer.of(currentRow.getKey()); + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private void ensureOpen() { + if (isClosed) + throw new IllegalStateException("Iterator has been closed."); + } + } +} |