aboutsummaryrefslogtreecommitdiffstats
path: root/musictrigger/src/MusicTrigger.java
diff options
context:
space:
mode:
Diffstat (limited to 'musictrigger/src/MusicTrigger.java')
-rw-r--r--[-rwxr-xr-x]musictrigger/src/MusicTrigger.java129
1 files changed, 25 insertions, 104 deletions
diff --git a/musictrigger/src/MusicTrigger.java b/musictrigger/src/MusicTrigger.java
index b5894da1..a27a6c40 100755..100644
--- a/musictrigger/src/MusicTrigger.java
+++ b/musictrigger/src/MusicTrigger.java
@@ -23,14 +23,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Map;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
@@ -40,7 +33,6 @@ import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.triggers.ITrigger;
@@ -55,109 +47,38 @@ public class MusicTrigger implements ITrigger {
private static final Logger logger = LoggerFactory.getLogger(MusicTrigger.class);
-
public Collection<Mutation> augment(Partition partition)
{
- boolean isDelete = false;
- logger.info("Step 1: "+partition.partitionLevelDeletion().isLive());
- if(partition.partitionLevelDeletion().isLive()) {
-
- } else {
- // Partition Level Deletion
- isDelete = true;
- }
- logger.info("MusicTrigger isDelete: " + isDelete);
- String ksName = partition.metadata().ksName;
String tableName = partition.metadata().cfName;
logger.info("MusicTrigger Table: " + tableName);
- boolean isInsert = checkQueryType(partition);
+
org.json.simple.JSONObject obj = new org.json.simple.JSONObject();
-
- String operation = null;
- if(isDelete)
- operation = "delete";
- else if(isInsert)
- operation = "insert";
- else
- operation = "update";
- Map<String, Object> changeMap = new HashMap<>();
-
- obj.put("operation", operation);
- obj.put("keyspace", ksName);
- obj.put("table_name", tableName);
- obj.put("full_table", ksName+"."+tableName);
- obj.put("primary_key", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
- List<String> updateList = new ArrayList<>();
- //obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
- if("update".equals(operation)) {
- try {
- UnfilteredRowIterator it = partition.unfilteredIterator();
- while (it.hasNext()) {
- Unfiltered un = it.next();
- Clustering clt = (Clustering) un.clustering();
- Iterator<Cell> cells = partition.getRow(clt).cells().iterator();
- Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator();
-
- while(columns.hasNext()){
- ColumnDefinition columnDef = columns.next();
- Cell cell = cells.next();
-
- String data = null;
- if(cell.column().type.toString().equals("org.apache.cassandra.db.marshal.UTF8Type")) {
- logger.info(">> type is String");
- data = new String(cell.value().array()); // If cell type is text
- } else if(cell.column().type.toString().equals("org.apache.cassandra.db.marshal.Int32Type")) {
- //ByteBuffer wrapped = ByteBuffer.wrap(cell.value()); // big-endian by default
- int num = fromByteArray(cell.value().array());
- logger.info(">> type is Integer1 :: "+num);
- data = String.valueOf(num);
- }
-
- logger.info("Inside triggers loop: "+columnDef.name+" : "+data);
- //changeMap.put(ksName+"."+tableName+"."+columnDef.name,data);
- updateList.add(ksName+"."+tableName+":"+columnDef.name+":"+data);
- changeMap.put("field_value",ksName+"."+tableName+":"+columnDef.name+":"+data);
-
- }
- }
- } catch (Exception e) {
-
- }
- obj.put("updateList", updateList);
- } else {
- changeMap.put("field_value", ksName+"."+tableName);
- }
-
- obj.put("changeValue", changeMap);
- logger.info("Sending response: "+obj.toString());
+ obj.put("message_id", partition.metadata().getKeyValidator().getString(partition.partitionKey().getKey()));
+
try {
- notifyMusic(obj.toString());
- } catch(Exception e) {
- e.printStackTrace();
- logger.error("Notification failed..."+e.getMessage());
- }
- return Collections.emptyList();
- }
-
- private int fromByteArray(byte[] bytes) {
- return bytes[0] << 24 | (bytes[1] & 0xFF) << 16 | (bytes[2] & 0xFF) << 8 | (bytes[3] & 0xFF);
- }
-
- private boolean checkQueryType(Partition partition) {
- UnfilteredRowIterator it = partition.unfilteredIterator();
- while (it.hasNext()) {
- Unfiltered unfiltered = it.next();
- Row row = (Row) unfiltered;
- if (isInsert(row)) {
- return true;
+ UnfilteredRowIterator it = partition.unfilteredIterator();
+ while (it.hasNext()) {
+ Unfiltered un = it.next();
+ Clustering clt = (Clustering) un.clustering();
+ Iterator<Cell> cells = partition.getRow(clt).cells().iterator();
+ Iterator<ColumnDefinition> columns = partition.getRow(clt).columns().iterator();
+
+ while(columns.hasNext()){
+ ColumnDefinition columnDef = columns.next();
+ Cell cell = cells.next();
+ String data = new String(cell.value().array()); // If cell type is text
+ obj.put(columnDef.toString(), data);
+ }
}
+ } catch (Exception e) {
+
}
- return false;
+ logger.info("What is this? "+obj.toString());
+
+ notifyMusic(obj.toString());
+ return Collections.emptyList();
}
- private boolean isInsert(Row row) {
- return row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE;
- }
private void notifyMusic(String request) {
System.out.println("notifyMusic...");
@@ -171,12 +92,12 @@ public class MusicTrigger implements ITrigger {
.post(ClientResponse.class, data);
if(response.getStatus() != 200){
- System.out.println("Exception while notifying MUSIC...");
+ System.out.println("Exception...");
}
- /*response.getHeaders().put(HttpHeaders.CONTENT_TYPE, Arrays.asList(MediaType.APPLICATION_JSON));
+ response.getHeaders().put(HttpHeaders.CONTENT_TYPE, Arrays.asList(MediaType.APPLICATION_JSON));
response.bufferEntity();
String x = response.getEntity(String.class);
- System.out.println("Response: "+x);*/
+ System.out.println("Response: "+x);
}