From 46350c084766789ea59e83f1917c57c81d653048 Mon Sep 17 00:00:00 2001 From: "Tschaen, Brendan" Date: Tue, 16 Oct 2018 20:22:35 -0400 Subject: Include Cassandra locking Change-Id: I085acf8336d5f27782ee12768846a5befd3ee60d Issue-ID: MUSIC-148 Signed-off-by: Tschaen, Brendan --- musictrigger/src/MusicTrigger.java | 129 +++++++------------------------------ 1 file changed, 25 insertions(+), 104 deletions(-) mode change 100755 => 100644 musictrigger/src/MusicTrigger.java (limited to 'musictrigger/src/MusicTrigger.java') diff --git a/musictrigger/src/MusicTrigger.java b/musictrigger/src/MusicTrigger.java old mode 100755 new mode 100644 index b5894da1..a27a6c40 --- a/musictrigger/src/MusicTrigger.java +++ b/musictrigger/src/MusicTrigger.java @@ -23,14 +23,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; -import java.util.List; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Map; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; @@ -40,7 +33,6 @@ import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.db.rows.Cell; -import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.triggers.ITrigger; @@ -55,109 +47,38 @@ public class MusicTrigger implements ITrigger { private static final Logger logger = LoggerFactory.getLogger(MusicTrigger.class); - public Collection augment(Partition partition) { - boolean isDelete = false; - logger.info("Step 1: "+partition.partitionLevelDeletion().isLive()); - if(partition.partitionLevelDeletion().isLive()) { - - } else { - // Partition Level Deletion - isDelete = true; - } - logger.info("MusicTrigger isDelete: " + isDelete); - String ksName = partition.metadata().ksName; String tableName = partition.metadata().cfName; logger.info("MusicTrigger Table: " + tableName); - boolean isInsert = checkQueryType(partition); + org.json.simple.JSONObject obj = new org.json.simple.JSONObject(); - - String operation = null; - if(isDelete) - operation = "delete"; - else if(isInsert) - operation = "insert"; - else - operation = "update"; - Map changeMap = new HashMap<>(); - - obj.put("operation", operation); - obj.put("keyspace", ksName); - obj.put("table_name", tableName); - obj.put("full_table", ksName+"."+tableName); - obj.put("primary_key", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey())); - List updateList = new ArrayList<>(); - //obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey())); - if("update".equals(operation)) { - try { - UnfilteredRowIterator it = partition.unfilteredIterator(); - while (it.hasNext()) { - Unfiltered un = it.next(); - Clustering clt = (Clustering) un.clustering(); - Iterator cells = partition.getRow(clt).cells().iterator(); - Iterator columns = partition.getRow(clt).columns().iterator(); - - while(columns.hasNext()){ - ColumnDefinition columnDef = columns.next(); - Cell cell = cells.next(); - - String data = null; - if(cell.column().type.toString().equals("org.apache.cassandra.db.marshal.UTF8Type")) { - logger.info(">> type is String"); - data = new String(cell.value().array()); // If cell type is text - } else if(cell.column().type.toString().equals("org.apache.cassandra.db.marshal.Int32Type")) { - //ByteBuffer wrapped = ByteBuffer.wrap(cell.value()); // big-endian by default - int num = fromByteArray(cell.value().array()); - logger.info(">> type is Integer1 :: "+num); - data = String.valueOf(num); - } - - logger.info("Inside triggers loop: "+columnDef.name+" : "+data); - //changeMap.put(ksName+"."+tableName+"."+columnDef.name,data); - updateList.add(ksName+"."+tableName+":"+columnDef.name+":"+data); - changeMap.put("field_value",ksName+"."+tableName+":"+columnDef.name+":"+data); - - } - } - } catch (Exception e) { - - } - obj.put("updateList", updateList); - } else { - changeMap.put("field_value", ksName+"."+tableName); - } - - obj.put("changeValue", changeMap); - logger.info("Sending response: "+obj.toString()); + obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey())); + try { - notifyMusic(obj.toString()); - } catch(Exception e) { - e.printStackTrace(); - logger.error("Notification failed..."+e.getMessage()); - } - return Collections.emptyList(); - } - - private int fromByteArray(byte[] bytes) { - return bytes[0] << 24 | (bytes[1] & 0xFF) << 16 | (bytes[2] & 0xFF) << 8 | (bytes[3] & 0xFF); - } - - private boolean checkQueryType(Partition partition) { - UnfilteredRowIterator it = partition.unfilteredIterator(); - while (it.hasNext()) { - Unfiltered unfiltered = it.next(); - Row row = (Row) unfiltered; - if (isInsert(row)) { - return true; + UnfilteredRowIterator it = partition.unfilteredIterator(); + while (it.hasNext()) { + Unfiltered un = it.next(); + Clustering clt = (Clustering) un.clustering(); + Iterator cells = partition.getRow(clt).cells().iterator(); + Iterator columns = partition.getRow(clt).columns().iterator(); + + while(columns.hasNext()){ + ColumnDefinition columnDef = columns.next(); + Cell cell = cells.next(); + String data = new String(cell.value().array()); // If cell type is text + obj.put(columnDef.toString(), data); + } } + } catch (Exception e) { + } - return false; + logger.info("What is this? "+obj.toString()); + + notifyMusic(obj.toString()); + return Collections.emptyList(); } - private boolean isInsert(Row row) { - return row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE; - } private void notifyMusic(String request) { System.out.println("notifyMusic..."); @@ -171,12 +92,12 @@ public class MusicTrigger implements ITrigger { .post(ClientResponse.class, data); if(response.getStatus() != 200){ - System.out.println("Exception while notifying MUSIC..."); + System.out.println("Exception..."); } - /*response.getHeaders().put(HttpHeaders.CONTENT_TYPE, Arrays.asList(MediaType.APPLICATION_JSON)); + response.getHeaders().put(HttpHeaders.CONTENT_TYPE, Arrays.asList(MediaType.APPLICATION_JSON)); response.bufferEntity(); String x = response.getEntity(String.class); - System.out.println("Response: "+x);*/ + System.out.println("Response: "+x); } -- cgit 1.2.3-korg