/* * ============LICENSE_START========================================== * org.onap.music * =================================================================== * Copyright (c) 2017 AT&T Intellectual Property * =================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * ============LICENSE_END============================================= * ==================================================================== */ package org.onap.music.datastore; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import com.datastax.driver.core.*; import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.eelf.logging.format.AppMessages; import org.onap.music.eelf.logging.format.ErrorSeverity; import org.onap.music.eelf.logging.format.ErrorTypes; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.MusicUtil; import com.datastax.driver.core.ColumnDefinitions.Definition; import com.datastax.driver.core.exceptions.AlreadyExistsException; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; import org.onap.music.util.TimeMeasureInstance; /** * @author nelson24 * @author bharathb */ public class MusicDataStore { public static final String CONSISTENCY_LEVEL_ONE = "ONE"; public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM"; public static final String CONSISTENCY_LEVEL_SERIAL = "SERIAL"; private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class); private Session session; private Cluster cluster; public Session getSession() { return session; } /** * Constructs DataStore by providing existing cluster and session * @param cluster * @param session */ public MusicDataStore(Cluster cluster, Session session) { this.session = session; this.cluster = cluster; } /** * * @param keyspace * @param tableName * @param columnName * @return DataType */ public DataType returnColumnDataType(String keyspace, String tableName, String columnName) { KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace); TableMetadata table = ks.getTable(tableName); return table.getColumn(columnName).getType(); } /** * * @param keyspace * @param tableName * @return TableMetadata */ public TableMetadata returnColumnMetadata(String keyspace, String tableName) { KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace); return ks.getTable(tableName); } /** * Utility function to return the Java specific object type. * * @param row * @param colName * @param colType * @return */ public static Object getColValue(Row row, String colName, DataType colType) { switch (colType.getName()) { case VARCHAR: return row.getString(colName); case UUID: return row.getUUID(colName); case VARINT: return row.getVarint(colName); case BIGINT: return row.getLong(colName); case INT: return row.getInt(colName); case FLOAT: return row.getFloat(colName); case DOUBLE: return row.getDouble(colName); case BOOLEAN: return row.getBool(colName); case MAP: return row.getMap(colName, String.class, String.class); case LIST: return row.getList(colName, String.class); default: return null; } } public byte[] getBlobValue(Row row, String colName, DataType colType) { ByteBuffer bb = row.getBytes(colName); byte[] data = bb.array(); return data; } public static boolean doesRowSatisfyCondition(Row row, Map condition) throws Exception { ColumnDefinitions colInfo = row.getColumnDefinitions(); for (Map.Entry entry : condition.entrySet()) { String colName = entry.getKey(); DataType colType = colInfo.getType(colName); Object columnValue = getColValue(row, colName, colType); Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue()); if (columnValue.equals(conditionValue) == false) return false; } return true; } /** * Utility function to store ResultSet values in to a MAP for output. * * @param results * @return MAP */ public Map> marshalData(ResultSet results) { Map> resultMap = new HashMap>(); int counter = 0; for (Row row : results) { ColumnDefinitions colInfo = row.getColumnDefinitions(); HashMap resultOutput = new HashMap(); for (Definition definition : colInfo) { if (!definition.getName().equals("vector_ts")) { if(definition.getType().toString().toLowerCase().contains("blob")) { resultOutput.put(definition.getName(), getBlobValue(row, definition.getName(), definition.getType())); } else resultOutput.put(definition.getName(), getColValue(row, definition.getName(), definition.getType())); } } resultMap.put("row " + counter, resultOutput); counter++; } return resultMap; } /** * This Method performs DDL and DML operations on Cassandra using specified consistency level outside any time-slot * * @param queryObject Object containing cassandra prepared query and values. * @param consistency Specify consistency level for data synchronization across cassandra * replicas * @return Boolean Indicates operation success or failure * @throws MusicServiceException * @throws MusicQueryException */ public boolean executePut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException, MusicQueryException { return executePut(queryObject, consistency, 0); } // Prepared Statements 1802 additions /** * This Method performs DDL and DML operations on Cassandra using specified consistency level * * @param queryObject Object containing cassandra prepared query and values. * @param consistencyLevel Specify consistency level for data synchronization across cassandra * replicas * @param timeSlot Specify timestamp time-slot * @return Boolean Indicates operation success or failure * @throws MusicServiceException * @throws MusicQueryException */ public boolean executePut(PreparedQueryObject queryObject, String consistencyLevel, long timeSlot) throws MusicServiceException, MusicQueryException { TimeMeasureInstance.instance().enter("executePut" + consistencyLevel); try { boolean result; long timeOfWrite = System.currentTimeMillis(); if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(), AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicQueryException("Ill formed queryObject for the request = " + "[" + queryObject.getQuery() + "]"); } logger.info(EELFLoggerDelegate.applicationLogger, "In preprared Execute Put: the actual insert query:" + queryObject.getQuery() + "; the values" + queryObject.getValues()); SimpleStatement statement; try { statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray()); } catch (InvalidQueryException iqe) { logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(), AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); throw new MusicQueryException(iqe.getMessage()); } catch (Exception e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); throw new MusicQueryException(e.getMessage()); } try { if (consistencyLevel.equalsIgnoreCase(MusicUtil.CRITICAL)) { logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query"); statement.setConsistencyLevel(ConsistencyLevel.QUORUM); } else if (consistencyLevel.equalsIgnoreCase(MusicUtil.EVENTUAL)) { logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query"); statement.setConsistencyLevel(ConsistencyLevel.ONE); } long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite); statement.setDefaultTimestamp(timestamp); ResultSet rs = session.execute(statement); result = rs.wasApplied(); } catch (AlreadyExistsException ae) { logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(), AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicServiceException(ae.getMessage()); } catch (Exception e) { logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicQueryException("Executing Session Failure for Request = " + "[" + queryObject.getQuery() + "]" + " Reason = " + e.getMessage()); } return result; } finally { TimeMeasureInstance.instance().exit(); } } /** * This method performs DDL operations on Cassandra using consistency specified consistency. * * @param queryObject Object containing cassandra prepared query and values. */ public ResultSet executeGet(PreparedQueryObject queryObject, String consistencyLevel) throws MusicServiceException, MusicQueryException { if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicQueryException("Ill formed queryObject for the request = " + "[" + queryObject.getQuery() + "]"); } logger.info(EELFLoggerDelegate.applicationLogger, "Executing Eventual get query:" + queryObject.getQuery()); ResultSet results = null; try { SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray()); if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) { statement.setConsistencyLevel(ConsistencyLevel.ONE); } else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) { statement.setConsistencyLevel(ConsistencyLevel.QUORUM); } else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_SERIAL)) { statement.setConsistencyLevel(ConsistencyLevel.SERIAL); } results = session.execute(statement); } catch (Exception ex) { logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicServiceException(ex.getMessage()); } return results; } /** * This method performs DDL operations on Cassandra using consistency level ONE. * * @param queryObject Object containing cassandra prepared query and values. */ public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject) throws MusicServiceException, MusicQueryException { TimeMeasureInstance.instance().enter("executeOneConsistencyGet"); try { return executeGet(queryObject, CONSISTENCY_LEVEL_ONE); } finally { TimeMeasureInstance.instance().exit(); } } /** * This method performs DDL operations on Cassandra using consistency level ONE. * * @param queryObject Object containing cassandra prepared query and values. */ public ResultSet executeSerialConsistencyGet(PreparedQueryObject queryObject) throws MusicServiceException, MusicQueryException { TimeMeasureInstance.instance().enter("executeOneConsistencyGet"); try { return executeGet(queryObject, CONSISTENCY_LEVEL_SERIAL); } finally { TimeMeasureInstance.instance().exit(); } } /** * * This method performs DDL operation on Cassandra using consistency level QUORUM. * * @param queryObject Object containing cassandra prepared query and values. */ public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject) throws MusicServiceException, MusicQueryException { TimeMeasureInstance.instance().enter("executeQuorumConsistencyGet"); try { return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM); } finally { TimeMeasureInstance.instance().exit(); } } @Deprecated public void close() { session.close(); } }