aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/music/client/MusicClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/music/client/MusicClient.java')
-rw-r--r--src/main/java/org/onap/music/client/MusicClient.java604
1 files changed, 604 insertions, 0 deletions
diff --git a/src/main/java/org/onap/music/client/MusicClient.java b/src/main/java/org/onap/music/client/MusicClient.java
new file mode 100644
index 00000000..2d29e236
--- /dev/null
+++ b/src/main/java/org/onap/music/client/MusicClient.java
@@ -0,0 +1,604 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ * Copyright (c) 2017 AT&T Intellectual Property
+ * ===================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+package org.onap.music.client;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.onap.music.datastore.jsonobjects.JsonInsert;
+import org.onap.music.datastore.jsonobjects.JsonKeySpace;
+import org.onap.music.datastore.jsonobjects.JsonTable;
+import org.onap.music.lockingservice.MusicLockingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ColumnDefinitions.Definition;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.querybuilder.Clause;
+import com.datastax.driver.core.querybuilder.Delete;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+
+/**
+ * A MUSIC client that talks directly to Cassandra/ZooKeeper. This was taken, and slightly modified,
+ * from the REST version of the code.
+ *
+ * @author Robert Eby
+ */
+public class MusicClient {
+ private static final Logger LOG = LoggerFactory.getLogger(MusicClient.class);
+
+ private final String[] music_hosts; // array of hosts in the music cluster
+ private Cluster cluster; // MUSIC Cassandra cluster
+ private Session session; // MUSIC Cassandra session
+ private MusicLockingService mls; // ZooKeeper
+ private final Set<String> lockNames;// set of active lock names
+
+ /**
+ * Create a MUSIC client that talks to MUSIC on localhost.
+ */
+ public MusicClient() {
+ this("127.0.0.1");
+ }
+
+ /**
+ * Create a MUSIC client that talks to MUSIC on a remote host. The string <i>hosts</i> is a
+ * comma-separated list of IP addresses for remote instances of Cassandra/ZooKeeper.
+ *
+ * @param hosts the list of hostnames
+ */
+ public MusicClient(String hosts) {
+ music_hosts = hosts.split(",");
+ if (cluster == null) {
+ LOG.debug("Initializing MUSIC Client with endpoints " + hosts);
+ cluster = Cluster.builder().addContactPoints(music_hosts).build();
+ }
+ session = cluster.connect();
+ mls = null;
+ lockNames = new HashSet<String>();
+ }
+
+ /**
+ * Close the connection to MUSIC.
+ */
+ public void close() {
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ if (cluster != null) {
+ cluster.close();
+ cluster = null;
+ }
+ }
+
+ /**
+ * Be sure to close the connection to MUSIC when this object is GC-ed.
+ */
+ @Override
+ protected void finalize() {
+ close();
+ }
+
+ /**
+ * Return a String representation of the music hosts used by this object.
+ *
+ * @return the string
+ */
+ @Override
+ public String toString() {
+ List<String> t = Arrays.asList(music_hosts);
+ return "MUSIC hosts=" + t.toString();
+ }
+
+ /**
+ * Create a lock.
+ *
+ * @see org.onap.music.lockingservice.MusicLockingService#createLock(String)
+ * @param lockName the lock name
+ * @return FILL IN
+ */
+ public String createLock(String lockName) {
+ String ln = "/" + lockName;
+ synchronized (lockNames) {
+ lockNames.add(ln);
+ }
+ return getLockingService().createLockId(ln);
+ }
+
+ /**
+ * Acquire a lock.
+ *
+ * @see org.onap.music.lockingservice.MusicLockingService#lock(String)
+ * @param lockName the lock name
+ * @return FILL IN
+ */
+ public boolean acquireLock(String lockName) {
+ return getLockingService().isMyTurn(lockName);
+ }
+
+ /**
+ * Get the lock holder.
+ *
+ * @see org.onap.music.lockingservice.MusicLockingService#currentLockHolder(String)
+ * @param lockName the lock name
+ * @return FILL IN
+ */
+ public String getLockHolder(String lockName) {
+ return getLockingService().whoseTurnIsIt("/" + lockName);
+ }
+
+ /**
+ * Unlock a lock.
+ *
+ * @see org.onap.music.lockingservice.MusicLockingService#unlock(String)
+ * @param lockName the lock name
+ */
+ public void unlockLock(String lockName) {
+ getLockingService().unlockAndDeleteId(lockName);
+ }
+
+ /**
+ * Delete a lock.
+ *
+ * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String)
+ * @param lockName the lock name
+ */
+ public void deleteLock(String lockName) {
+ String ln = "/" + lockName;
+ synchronized (lockNames) {
+ lockNames.remove(ln);
+ }
+ getLockingService().deleteLock(ln);
+ }
+
+ /**
+ * Delete all locks.
+ *
+ * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String)
+ * @return true
+ */
+ public boolean deleteAllLocks() {
+ synchronized (lockNames) {
+ for (String lockName : lockNames) {
+ deleteLock(lockName);
+ }
+ lockNames.clear();
+ }
+ return true;
+ }
+
+ /**
+ * Create a keyspace using the default replication configuration.
+ *
+ * @param keyspaceName the name of the keyspace
+ * @return always true currently
+ * @throws Exception Cassandra exceptions are passed through
+ */
+ public boolean createKeyspace(String keyspaceName) throws Exception {
+ Map<String, Object> repl = new HashMap<String, Object>();
+ repl.put("class", "SimpleStrategy");
+ repl.put("replication_factor", 1);
+ Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
+ JsonKeySpace jsonKp = new JsonKeySpace();
+ jsonKp.setConsistencyInfo(consistencyInfo);
+ jsonKp.setDurabilityOfWrites("true");
+ jsonKp.setReplicationInfo(repl);
+ return createKeyspace(keyspaceName, jsonKp);
+ }
+
+ public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
+ String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo());
+ Map<String, Object> replicationInfo = kspObject.getReplicationInfo();
+ String durability = "";
+ if (kspObject.getDurabilityOfWrites() != null)
+ durability = " AND durable_writes = " + kspObject.getDurabilityOfWrites();
+ String query = String.format(
+ "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { %s } %s;",
+ keyspaceName, jsonMaptoSqlString(replicationInfo, ","), durability);
+ LOG.debug(query);
+ executeCreateQuery(query, consistency);
+ return true;
+ }
+
+ public boolean dropKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
+ String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo());
+ String query = String.format("DROP KEYSPACE %s;", keyspaceName);
+ LOG.debug(query);
+ executeCreateQuery(query, consistency);
+ return false;
+ }
+
+ public boolean createTable(String tablename, Map<String, String> cols) throws Exception {
+ JsonTable tableObj = new JsonTable();
+ Map<String, String> map = new HashMap<String, String>(); // This should be in the
+ // consutructor!
+ map.put("type", "eventual");
+ tableObj.setConsistencyInfo(map);
+ return createTable(tablename, cols, tableObj);
+ }
+
+ public boolean createTable(String tablename, Map<String, String> cols, JsonTable tableObj)
+ throws Exception {
+ // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html
+
+ // first read the information about the table fields
+ StringBuilder fields = new StringBuilder();
+ String prefix = "";
+ for (String key : cols.keySet()) {
+ fields.append(prefix).append(key).append(" ").append(cols.get(key));
+ prefix = ", ";
+ }
+
+ // information about the name-value style properties
+ // Map<String,Object> propertiesMap = tableObj.getProperties();
+ // String propertiesString="";
+ // if(propertiesMap != null){
+ // counter =0;
+ // for (Map.Entry<String, Object> entry : propertiesMap.entrySet())
+ // {
+ // Object ot = entry.getValue();
+ // String value = ot+"";
+ // if(ot instanceof String){
+ // value = "'"+value+"'";
+ // }else if(ot instanceof Map){
+ // Map<String,Object> otMap = (Map<String,Object>)ot;
+ // value = "{"+jsonMaptoSqlString(otMap, ",")+"}";
+ // }
+ // propertiesString = propertiesString+entry.getKey()+"="+ value+"";
+ // if(counter!=propertiesMap.size()-1)
+ // propertiesString = propertiesString+" AND ";
+ // counter = counter +1;
+ // }
+ // }
+
+ String query = String.format("CREATE TABLE IF NOT EXISTS %s (%s);", tablename,
+ fields.toString());
+ // if (propertiesMap != null)
+ // query = query + " WITH "+ propertiesString;
+
+ LOG.debug(query);
+ String consistency = extractConsistencyInfo(tablename, tableObj.getConsistencyInfo());
+ executeCreateQuery(query, consistency);
+ return false;
+ }
+
+ public boolean dropTable(String name) {
+ // TODO
+ return false;
+ }
+
+ public boolean insertRow(String name, Map<String, Object> valuesMap) throws Exception {
+ Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
+ return insertRow(name, valuesMap, consistencyInfo, new JsonInsert());
+ }
+
+ public boolean insertRow(String tablename, Map<String, Object> valuesMap,
+ Map<String, String> consistencyInfo, JsonInsert insObj) throws Exception {
+ // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/insert_r.html
+ String[] parts = tablename.split("\\.");
+ KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
+ TableMetadata tableInfo = ks.getTable(parts[1]);
+
+ StringBuilder fields = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+ String prefix = "";
+ for (String key : valuesMap.keySet()) {
+ fields.append(prefix).append(key);
+ Object valueObj = valuesMap.get(key);
+ DataType colType = tableInfo.getColumn(key).getType();
+ values.append(prefix).append(convertToSqlDataType(colType, valueObj));
+ prefix = ", ";
+ }
+
+ String suffix = getTTLSuffix(insObj);
+ String query = String.format("INSERT INTO %s (%s) VALUES (%s)%s;", tablename,
+ fields.toString(), values.toString(), suffix);
+ LOG.debug(query);
+
+ String consistency = extractConsistencyInfo(tablename, consistencyInfo);
+ executeCreateQuery(query, consistency);
+ return false;
+ }
+
+ public boolean lockRow(String name, Map<String, String> cols) {
+ // TODO
+ return false;
+ }
+
+ /**
+ * Select ALL rows in the table.
+ *
+ * @param tablename the name of the table
+ * @return a list of maps, one map per row
+ */
+ public List<Map<String, Object>> selectRows(final String tablename) {
+ return selectRows(tablename, new HashMap<String, String>());
+ }
+
+ public List<Map<String, Object>> selectRows(final String tablename, Map<String, String> cols) {
+ String ns = "";
+ String tbl = tablename;
+ int ix = tbl.indexOf('.');
+ if (ix >= 0) {
+ ns = tablename.substring(0, ix);
+ tbl = tablename.substring(ix + 1);
+ }
+ Select sel = QueryBuilder.select().all().from(ns, tbl);
+ Statement stmt = sel;
+ if (cols.size() == 1) {
+ // only handles 1 WHERE value right now
+ String k = cols.keySet().iterator().next();
+ Clause eqclause = QueryBuilder.eq(k, cols.get(k));
+ stmt = sel.where(eqclause);
+ }
+ ResultSet resultset = session.execute(stmt);
+ List<Map<String, Object>> results = new ArrayList<Map<String, Object>>();
+ for (Row row : resultset) {
+ Map<String, Object> map = new HashMap<String, Object>();
+ for (Definition definition : row.getColumnDefinitions()) {
+ map.put(definition.getName(),
+ readRow(row, definition.getName(), definition.getType()));
+ }
+ results.add(map);
+ }
+ return results;
+ }
+
+ private Object readRow(final Row row, final String name, final DataType colType) {
+ switch (colType.getName()) {
+ case BIGINT:
+ return row.getLong(name);
+ case BOOLEAN:
+ return row.getBool(name);
+ case DOUBLE:
+ return row.getDouble(name);
+ case FLOAT:
+ return row.getFloat(name);
+ case INT:
+ return row.getInt(name);
+ case MAP:
+ return row.getMap(name, String.class, String.class);
+ case UUID:
+ return row.getUUID(name);
+ case TEXT:
+ case VARCHAR:
+ return row.getString(name);
+ case VARINT:
+ return row.getVarint(name);
+ // These are not supported right now....
+ // ASCII
+ // BLOB
+ // COUNTER
+ // CUSTOM
+ // DECIMAL
+ // INET
+ // LIST
+ // SET
+ // TIMESTAMP
+ // TIMEUUID
+ // TUPLE
+ // UDT
+ default:
+ return null;
+ }
+ }
+
+ @Deprecated
+ public List<Map<String, String>> OLDselectRows(String tablename, Map<String, String> cols) {
+ String query = String.format("SELECT * FROM %s", tablename);
+ if (cols.size() > 0) {
+ // add WHERE clause
+ // String[] parts = tablename.split("\\.");
+ // KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
+ // TableMetadata tableInfo = ks.getTable(parts[1]);
+ String whereclause = " WHERE";
+ String prefix = "";
+ for (String key : cols.keySet()) {
+ String val = cols.get(key);
+ // DataType colType = tableInfo.getColumn(key).getType();
+ whereclause = String.format("%s%s %s = '%s'", whereclause, prefix, key, val);
+ prefix = " AND";
+ }
+ query += whereclause;
+ }
+ LOG.debug(query);
+ ResultSet resultset = session.execute(query);
+ List<Map<String, String>> results = new ArrayList<Map<String, String>>();
+ for (Row row : resultset) {
+ ColumnDefinitions colInfo = row.getColumnDefinitions();
+ Map<String, String> map = new HashMap<String, String>();
+ for (Definition definition : colInfo) {
+ // map.put(definition.getName(), (String)MusicDataStore.readRow(row,
+ // definition.getName(), definition.getType()));
+ }
+ results.add(map);
+ }
+ return results;
+ }
+
+ public void updateRows(String tablename, Map<String, String> cols, Map<String, Object> vals)
+ throws Exception {
+ Map<String, String> consistencyInfo = Collections.singletonMap("type", "eventual");
+ updateRows(tablename, cols, vals, consistencyInfo, new JsonInsert());
+ }
+
+ public void updateRows(String tablename, Map<String, String> cols, Map<String, Object> vals,
+ Map<String, String> consistencyInfo, JsonInsert insObj) throws Exception {
+ // https://docs.datastax.com/en/cql/3.0/cql/cql_reference/update_r.html
+
+ // obtain the field value pairs of the update
+ String[] parts = tablename.split("\\.");
+ KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]);
+ TableMetadata tableInfo = ks.getTable(parts[1]);
+
+ StringBuilder fields = new StringBuilder();
+ String prefix = "";
+ for (String key : vals.keySet()) {
+ Object valueObj = vals.get(key);
+ String valueString = convertToSqlDataType(tableInfo.getColumn(key).getType(), valueObj);
+ fields.append(prefix).append(key).append(" = ").append(valueString);
+ prefix = ", ";
+ }
+
+ // get the row specifier
+ StringBuilder rows = new StringBuilder();
+ String primaryKey = "";
+ prefix = "";
+ for (String key : cols.keySet()) {
+ String indValue = cols.get(key);
+ DataType colType = tableInfo.getColumn(key).getType();
+ String formattedValue = convertToSqlDataType(colType, indValue);
+ primaryKey = primaryKey + indValue;
+ rows.append(prefix).append(key).append(" = ").append(formattedValue);
+ prefix = " AND ";
+ }
+
+ String using = getTTLSuffix(insObj);
+ String query = String.format("UPDATE %s%s SET %s WHERE %s;", tablename, using,
+ fields.toString(), rows.toString());
+ LOG.debug(query);
+
+ String consistency = extractConsistencyInfo(tablename, consistencyInfo);
+ executeCreateQuery(query, consistency);
+ }
+
+ public void deleteRows(String tablename, Map<String, String> cols) {
+ String ns = "";
+ String tbl = tablename;
+ int ix = tbl.indexOf('.');
+ if (ix >= 0) {
+ ns = tablename.substring(0, ix);
+ tbl = tablename.substring(ix + 1);
+ }
+ Delete stmt = QueryBuilder.delete().from(ns, tbl);
+ if (cols.size() == 1) {
+ // only handles 1 WHERE value right now
+ String k = cols.keySet().iterator().next();
+ Clause eqclause = QueryBuilder.eq(k, cols.get(k));
+ session.execute(stmt.where(eqclause));
+ } else {
+ session.execute(stmt);
+ }
+ }
+
+ private String getTTLSuffix(JsonInsert insObj) {
+ String ttl = insObj.getTtl();
+ String timestamp = insObj.getTimestamp();
+ if (ttl != null && ttl.length() > 0) {
+ if (timestamp != null && timestamp.length() > 0) {
+ return " USING TTL " + ttl + " AND TIMESTAMP " + timestamp;
+ } else {
+ return " USING TTL " + ttl;
+ }
+ } else if (timestamp != null && timestamp.length() > 0) {
+ return " USING TIMESTAMP " + timestamp;
+ }
+ return "";
+ }
+
+ private MusicLockingService getLockingService() {
+ if (mls == null) {
+ mls = new MusicLockingService(music_hosts[0]);
+ }
+ return mls;
+ }
+
+ private String extractConsistencyInfo(String key, Map<String, String> consistencyInfo)
+ throws Exception {
+ String consistency = "";
+ if (consistencyInfo.get("type").equalsIgnoreCase("atomic")) {
+ String lockId = consistencyInfo.get("lockId");
+ String lockName = lockId.substring(lockId.indexOf("$") + 1);
+ lockName = lockName.substring(0, lockName.indexOf("$"));
+
+ // first ensure that the lock name is correct before seeing if it has access
+ if (!lockName.equalsIgnoreCase(key))
+ throw new Exception("THIS LOCK IS NOT FOR THE KEY: " + key);
+
+ String lockStatus = getLockingService().isMyTurn(lockId) + "";
+ if (lockStatus.equalsIgnoreCase("false"))
+ throw new Exception("YOU DO NOT HAVE THE LOCK");
+ return "atomic";
+ }
+ if (consistencyInfo.get("type").equalsIgnoreCase("eventual"))
+ return "eventual";
+ throw new Exception("Consistency type " + consistency + " unknown!!");
+ }
+
+ // utility function to parse json map into sql like string
+ private String jsonMaptoSqlString(Map<String, Object> jMap, String lineDelimiter) {
+ String sql = "";
+ String prefix = "";
+ for (Map.Entry<String, Object> entry : jMap.entrySet()) {
+ Object ot = entry.getValue();
+ String value = ot + "";
+ if (ot instanceof String) {
+ value = "'" + value + "'";
+ }
+ sql = String.format("%s%s'%s': %s", sql, prefix, entry.getKey(), value);
+ prefix = lineDelimiter;
+ }
+ return sql;
+ }
+
+ private String convertToSqlDataType(DataType type, Object valueObj) {
+ switch (type.getName()) {
+ case TEXT:
+ String t = valueObj.toString();
+ t = t.replaceAll("'", "''");
+ return "'" + t + "'";
+ case MAP:
+ @SuppressWarnings("unchecked")
+ Map<String, Object> otMap = (Map<String, Object>) valueObj;
+ return "{" + jsonMaptoSqlString(otMap, ",") + "}";
+ default:
+ case UUID:
+ return valueObj.toString();
+ }
+ }
+
+ private void executeCreateQuery(String query, String consistency) throws Exception {
+ Statement statement = new SimpleStatement(query);
+ if (consistency.equalsIgnoreCase("atomic"))
+ statement.setConsistencyLevel(ConsistencyLevel.ALL);
+ else if (consistency.equalsIgnoreCase("eventual"))
+ statement.setConsistencyLevel(ConsistencyLevel.ONE);
+ else
+ throw new Exception("Consistency level " + consistency + " unknown!!");
+ session.execute(statement);
+ }
+}