From 64b04604921670862993fb2a72a895a6209947d5 Mon Sep 17 00:00:00 2001 From: Thomas Nelson Jr Date: Wed, 8 Aug 2018 00:17:33 +0000 Subject: Updates to Callback Api and Connection Change-Id: I6a3f0537a616ae4d54b47fa2c70ba5128e39f123 Issue-ID: MUSIC-92 Signed-off-by: Thomas Nelson Jr --- musictrigger/src/MusicTrigger.java | 60 ++++++++++++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 6 deletions(-) (limited to 'musictrigger/src/MusicTrigger.java') diff --git a/musictrigger/src/MusicTrigger.java b/musictrigger/src/MusicTrigger.java index a27a6c40..b3a54293 100644 --- a/musictrigger/src/MusicTrigger.java +++ b/musictrigger/src/MusicTrigger.java @@ -33,6 +33,7 @@ import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.triggers.ITrigger; @@ -47,14 +48,41 @@ public class MusicTrigger implements ITrigger { private static final Logger logger = LoggerFactory.getLogger(MusicTrigger.class); + public Collection augment(Partition partition) { + boolean isDelete = false; + logger.info("Step 1: "+partition.partitionLevelDeletion().isLive()); + if(partition.partitionLevelDeletion().isLive()) { + + } else { + // Partition Level Deletion + isDelete = true; + } + logger.info("MusicTrigger isDelete: " + isDelete); + String ksName = partition.metadata().ksName; String tableName = partition.metadata().cfName; logger.info("MusicTrigger Table: " + tableName); - + boolean isInsert = checkQueryType(partition); org.json.simple.JSONObject obj = new org.json.simple.JSONObject(); - obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey())); - + + + String operation = null; + if(isDelete) + operation = "delete"; + else if(isInsert) + operation = "insert"; + else + operation = "update"; + + + obj.put("operation", operation); + obj.put("keyspace", ksName); + obj.put("table_name", tableName); + obj.put("full_table", ksName+"."+tableName); + obj.put("primary_key", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey())); + + //obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey())); try { UnfilteredRowIterator it = partition.unfilteredIterator(); while (it.hasNext()) { @@ -67,18 +95,38 @@ public class MusicTrigger implements ITrigger { ColumnDefinition columnDef = columns.next(); Cell cell = cells.next(); String data = new String(cell.value().array()); // If cell type is text + logger.info("Inside triggers loop: "+columnDef.toString()+" : "+data); obj.put(columnDef.toString(), data); } } } catch (Exception e) { } - logger.info("What is this? "+obj.toString()); - - notifyMusic(obj.toString()); + logger.info("Sending response: "+obj.toString()); + try { + notifyMusic(obj.toString()); + } catch(Exception e) { + e.printStackTrace(); + logger.error("Notification failed..."+e.getMessage()s); + } return Collections.emptyList(); } + + private boolean checkQueryType(Partition partition) { + UnfilteredRowIterator it = partition.unfilteredIterator(); + while (it.hasNext()) { + Unfiltered unfiltered = it.next(); + Row row = (Row) unfiltered; + if (isInsert(row)) { + return true; + } + } + return false; + } + private boolean isInsert(Row row) { + return row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE; + } private void notifyMusic(String request) { System.out.println("notifyMusic..."); -- cgit 1.2.3-korg