/* * ============LICENSE_START========================================== * org.onap.music * =================================================================== * Copyright (c) 2017 AT&T Intellectual Property * =================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * ============LICENSE_END============================================= * ==================================================================== */ package org.onap.music.client; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.onap.music.datastore.jsonobjects.JsonInsert; import org.onap.music.datastore.jsonobjects.JsonKeySpace; import org.onap.music.datastore.jsonobjects.JsonTable; import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.lockingservice.MusicLockingService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.ColumnDefinitions.Definition; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.DataType; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.Statement; import com.datastax.driver.core.TableMetadata; import com.datastax.driver.core.querybuilder.Clause; import com.datastax.driver.core.querybuilder.Delete; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; /** * A MUSIC client that talks directly to Cassandra/ZooKeeper. This was taken, and slightly modified, * from the REST version of the code. * * @author Robert Eby */ public class MusicClient { private EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(MusicClient.class); private final String[] music_hosts; // array of hosts in the music cluster private Cluster cluster; // MUSIC Cassandra cluster private Session session; // MUSIC Cassandra session private MusicLockingService mls; // ZooKeeper private final Set lockNames;// set of active lock names /** * Create a MUSIC client that talks to MUSIC on localhost. */ public MusicClient() { this("127.0.0.1"); } /** * Create a MUSIC client that talks to MUSIC on a remote host. The string hosts is a * comma-separated list of IP addresses for remote instances of Cassandra/ZooKeeper. * * @param hosts the list of hostnames */ public MusicClient(String hosts) { music_hosts = hosts.split(","); if (cluster == null) { LOG.debug("Initializing MUSIC Client with endpoints " + hosts); cluster = Cluster.builder().addContactPoints(music_hosts).build(); } session = cluster.connect(); mls = null; lockNames = new HashSet(); } /** * Close the connection to MUSIC. */ public void close() { if (session != null) { session.close(); session = null; } if (cluster != null) { cluster.close(); cluster = null; } } /** * Be sure to close the connection to MUSIC when this object is GC-ed. */ @Override protected void finalize() { close(); } /** * Return a String representation of the music hosts used by this object. * * @return the string */ @Override public String toString() { List t = Arrays.asList(music_hosts); return "MUSIC hosts=" + t.toString(); } /** * Create a lock. * * @see org.onap.music.lockingservice.MusicLockingService#createLock(String) * @param lockName the lock name * @return FILL IN */ public String createLock(String lockName) { String ln = "/" + lockName; synchronized (lockNames) { lockNames.add(ln); } return getLockingService().createLockId(ln); } /** * Acquire a lock. * * @see org.onap.music.lockingservice.MusicLockingService#lock(String) * @param lockName the lock name * @return FILL IN */ public boolean acquireLock(String lockName) { return getLockingService().isMyTurn(lockName); } /** * Get the lock holder. * * @see org.onap.music.lockingservice.MusicLockingService#currentLockHolder(String) * @param lockName the lock name * @return FILL IN */ public String getLockHolder(String lockName) { return getLockingService().whoseTurnIsIt("/" + lockName); } /** * Unlock a lock. * * @see org.onap.music.lockingservice.MusicLockingService#unlock(String) * @param lockName the lock name */ public void unlockLock(String lockName) { getLockingService().unlockAndDeleteId(lockName); } /** * Delete a lock. * * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String) * @param lockName the lock name */ public void deleteLock(String lockName) { String ln = "/" + lockName; synchronized (lockNames) { lockNames.remove(ln); } getLockingService().deleteLock(ln); } /** * Delete all locks. * * @see org.onap.music.lockingservice.MusicLockingService#deleteLock(String) * @return true */ public boolean deleteAllLocks() { synchronized (lockNames) { for (String lockName : lockNames) { deleteLock(lockName); } lockNames.clear(); } return true; } /** * Create a keyspace using the default replication configuration. * * @param keyspaceName the name of the keyspace * @return always true currently * @throws Exception Cassandra exceptions are passed through */ public boolean createKeyspace(String keyspaceName) throws Exception { Map repl = new HashMap(); repl.put("class", "SimpleStrategy"); repl.put("replication_factor", 1); Map consistencyInfo = Collections.singletonMap("type", "eventual"); JsonKeySpace jsonKp = new JsonKeySpace(); jsonKp.setConsistencyInfo(consistencyInfo); jsonKp.setDurabilityOfWrites("true"); jsonKp.setReplicationInfo(repl); return createKeyspace(keyspaceName, jsonKp); } public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception { String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo()); Map replicationInfo = kspObject.getReplicationInfo(); String durability = ""; if (kspObject.getDurabilityOfWrites() != null) durability = " AND durable_writes = " + kspObject.getDurabilityOfWrites(); String query = String.format( "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { %s } %s;", keyspaceName, jsonMaptoSqlString(replicationInfo, ","), durability); LOG.info(EELFLoggerDelegate.applicationLogger,query); executeCreateQuery(query, consistency); return true; } public boolean dropKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception { String consistency = extractConsistencyInfo(keyspaceName, kspObject.getConsistencyInfo()); String query = String.format("DROP KEYSPACE %s;", keyspaceName); LOG.info(EELFLoggerDelegate.applicationLogger,query); executeCreateQuery(query, consistency); return false; } public boolean createTable(String tablename, Map cols) throws Exception { JsonTable tableObj = new JsonTable(); Map map = new HashMap(); // This should be in the // consutructor! map.put("type", "eventual"); tableObj.setConsistencyInfo(map); return createTable(tablename, cols, tableObj); } public boolean createTable(String tablename, Map cols, JsonTable tableObj) throws Exception { // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html // first read the information about the table fields StringBuilder fields = new StringBuilder(); String prefix = ""; for (String key : cols.keySet()) { fields.append(prefix).append(key).append(" ").append(cols.get(key)); prefix = ", "; } String query = String.format("CREATE TABLE IF NOT EXISTS %s (%s);", tablename, fields.toString()); LOG.debug(query); String consistency = extractConsistencyInfo(tablename, tableObj.getConsistencyInfo()); executeCreateQuery(query, consistency); return false; } public boolean dropTable(String name) { // TODO return false; } public boolean insertRow(String name, Map valuesMap) throws Exception { Map consistencyInfo = Collections.singletonMap("type", "eventual"); return insertRow(name, valuesMap, consistencyInfo, new JsonInsert()); } public boolean insertRow(String tablename, Map valuesMap, Map consistencyInfo, JsonInsert insObj) throws Exception { // Note: https://docs.datastax.com/en/cql/3.0/cql/cql_reference/insert_r.html String[] parts = tablename.split("\\."); KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]); TableMetadata tableInfo = ks.getTable(parts[1]); StringBuilder fields = new StringBuilder(); StringBuilder values = new StringBuilder(); String prefix = ""; for (String key : valuesMap.keySet()) { fields.append(prefix).append(key); Object valueObj = valuesMap.get(key); DataType colType = tableInfo.getColumn(key).getType(); values.append(prefix).append(convertToSqlDataType(colType, valueObj)); prefix = ", "; } String suffix = getTTLSuffix(insObj); String query = String.format("INSERT INTO %s (%s) VALUES (%s)%s;", tablename, fields.toString(), values.toString(), suffix); LOG.info(EELFLoggerDelegate.applicationLogger,query); String consistency = extractConsistencyInfo(tablename, consistencyInfo); executeCreateQuery(query, consistency); return false; } public boolean lockRow(String name, Map cols) { // TODO return false; } /** * Select ALL rows in the table. * * @param tablename the name of the table * @return a list of maps, one map per row */ public List> selectRows(final String tablename) { return selectRows(tablename, new HashMap()); } public List> selectRows(final String tablename, Map cols) { String ns = ""; String tbl = tablename; int ix = tbl.indexOf('.'); if (ix >= 0) { ns = tablename.substring(0, ix); tbl = tablename.substring(ix + 1); } Select sel = QueryBuilder.select().all().from(ns, tbl); Statement stmt = sel; if (cols.size() == 1) { // only handles 1 WHERE value right now String k = cols.keySet().iterator().next(); Clause eqclause = QueryBuilder.eq(k, cols.get(k)); stmt = sel.where(eqclause); } ResultSet resultset = session.execute(stmt); List> results = new ArrayList>(); for (Row row : resultset) { Map map = new HashMap(); for (Definition definition : row.getColumnDefinitions()) { map.put(definition.getName(), readRow(row, definition.getName(), definition.getType())); } results.add(map); } return results; } private Object readRow(final Row row, final String name, final DataType colType) { switch (colType.getName()) { case BIGINT: return row.getLong(name); case BOOLEAN: return row.getBool(name); case DOUBLE: return row.getDouble(name); case FLOAT: return row.getFloat(name); case INT: return row.getInt(name); case MAP: return row.getMap(name, String.class, String.class); case UUID: return row.getUUID(name); case TEXT: case VARCHAR: return row.getString(name); case VARINT: return row.getVarint(name); // These are not supported right now.... // ASCII // BLOB // COUNTER // CUSTOM // DECIMAL // INET // LIST // SET // TIMESTAMP // TIMEUUID // TUPLE // UDT default: return null; } } @Deprecated public List> OLDselectRows(String tablename, Map cols) { String query = String.format("SELECT * FROM %s", tablename); if (cols.size() > 0) { // add WHERE clause // String[] parts = tablename.split("\\."); // KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]); // TableMetadata tableInfo = ks.getTable(parts[1]); String whereclause = " WHERE"; String prefix = ""; for (String key : cols.keySet()) { String val = cols.get(key); // DataType colType = tableInfo.getColumn(key).getType(); whereclause = String.format("%s%s %s = '%s'", whereclause, prefix, key, val); prefix = " AND"; } query += whereclause; } LOG.debug(query); ResultSet resultset = session.execute(query); List> results = new ArrayList>(); for (Row row : resultset) { ColumnDefinitions colInfo = row.getColumnDefinitions(); Map map = new HashMap(); for (Definition definition : colInfo) { // map.put(definition.getName(), (String)MusicDataStore.readRow(row, // definition.getName(), definition.getType())); } results.add(map); } return results; } public void updateRows(String tablename, Map cols, Map vals) throws Exception { Map consistencyInfo = Collections.singletonMap("type", "eventual"); updateRows(tablename, cols, vals, consistencyInfo, new JsonInsert()); } public void updateRows(String tablename, Map cols, Map vals, Map consistencyInfo, JsonInsert insObj) throws Exception { // https://docs.datastax.com/en/cql/3.0/cql/cql_reference/update_r.html // obtain the field value pairs of the update String[] parts = tablename.split("\\."); KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(parts[0]); TableMetadata tableInfo = ks.getTable(parts[1]); StringBuilder fields = new StringBuilder(); String prefix = ""; for (String key : vals.keySet()) { Object valueObj = vals.get(key); String valueString = convertToSqlDataType(tableInfo.getColumn(key).getType(), valueObj); fields.append(prefix).append(key).append(" = ").append(valueString); prefix = ", "; } // get the row specifier StringBuilder rows = new StringBuilder(); String primaryKey = ""; prefix = ""; for (String key : cols.keySet()) { String indValue = cols.get(key); DataType colType = tableInfo.getColumn(key).getType(); String formattedValue = convertToSqlDataType(colType, indValue); primaryKey = primaryKey + indValue; rows.append(prefix).append(key).append(" = ").append(formattedValue); prefix = " AND "; } String using = getTTLSuffix(insObj); String query = String.format("UPDATE %s%s SET %s WHERE %s;", tablename, using, fields.toString(), rows.toString()); LOG.debug(query); String consistency = extractConsistencyInfo(tablename, consistencyInfo); executeCreateQuery(query, consistency); } public void deleteRows(String tablename, Map cols) { String ns = ""; String tbl = tablename; int ix = tbl.indexOf('.'); if (ix >= 0) { ns = tablename.substring(0, ix); tbl = tablename.substring(ix + 1); } Delete stmt = QueryBuilder.delete().from(ns, tbl); if (cols.size() == 1) { // only handles 1 WHERE value right now String k = cols.keySet().iterator().next(); Clause eqclause = QueryBuilder.eq(k, cols.get(k)); session.execute(stmt.where(eqclause)); } else { session.execute(stmt); } } private String getTTLSuffix(JsonInsert insObj) { String ttl = insObj.getTtl(); String timestamp = insObj.getTimestamp(); if (ttl != null && ttl.length() > 0) { if (timestamp != null && timestamp.length() > 0) { return " USING TTL " + ttl + " AND TIMESTAMP " + timestamp; } else { return " USING TTL " + ttl; } } else if (timestamp != null && timestamp.length() > 0) { return " USING TIMESTAMP " + timestamp; } return ""; } private MusicLockingService getLockingService() { if (mls == null) { mls = new MusicLockingService(music_hosts[0]); } return mls; } private String extractConsistencyInfo(String key, Map consistencyInfo) throws Exception { String consistency = ""; if (consistencyInfo.get("type").equalsIgnoreCase("atomic")) { String lockId = consistencyInfo.get("lockId"); String lockName = lockId.substring(lockId.indexOf("$") + 1); lockName = lockName.substring(0, lockName.indexOf("$")); // first ensure that the lock name is correct before seeing if it has access if (!lockName.equalsIgnoreCase(key)) throw new Exception("THIS LOCK IS NOT FOR THE KEY: " + key); String lockStatus = getLockingService().isMyTurn(lockId) + ""; if (lockStatus.equalsIgnoreCase("false")) throw new Exception("YOU DO NOT HAVE THE LOCK"); return "atomic"; } if (consistencyInfo.get("type").equalsIgnoreCase("eventual")) return "eventual"; throw new Exception("Consistency type " + consistency + " unknown!!"); } // utility function to parse json map into sql like string private String jsonMaptoSqlString(Map jMap, String lineDelimiter) { String sql = ""; String prefix = ""; for (Map.Entry entry : jMap.entrySet()) { Object ot = entry.getValue(); String value = ot + ""; if (ot instanceof String) { value = "'" + value + "'"; } sql = String.format("%s%s'%s': %s", sql, prefix, entry.getKey(), value); prefix = lineDelimiter; } return sql; } private String convertToSqlDataType(DataType type, Object valueObj) { switch (type.getName()) { case TEXT: String t = valueObj.toString(); t = t.replaceAll("'", "''"); return "'" + t + "'"; case MAP: @SuppressWarnings("unchecked") Map otMap = (Map) valueObj; return "{" + jsonMaptoSqlString(otMap, ",") + "}"; default: case UUID: return valueObj.toString(); } } private void executeCreateQuery(String query, String consistency) throws Exception { Statement statement = new SimpleStatement(query); if (consistency.equalsIgnoreCase("atomic")) statement.setConsistencyLevel(ConsistencyLevel.ALL); else if (consistency.equalsIgnoreCase("eventual")) statement.setConsistencyLevel(ConsistencyLevel.ONE); else throw new Exception("Consistency level " + consistency + " unknown!!"); session.execute(statement); } }