diff options
Diffstat (limited to 'src/main/java/org')
11 files changed, 637 insertions, 145 deletions
diff --git a/src/main/java/org/onap/music/datastore/jsonobjects/JSONCallbackResponse.java b/src/main/java/org/onap/music/datastore/jsonobjects/JSONCallbackResponse.java new file mode 100755 index 00000000..c521d0df --- /dev/null +++ b/src/main/java/org/onap/music/datastore/jsonobjects/JSONCallbackResponse.java @@ -0,0 +1,80 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ + +package org.onap.music.datastore.jsonobjects; + +import java.util.Map; + +public class JSONCallbackResponse { + + private String full_table; + private String keyspace; + private Map<String, String> changeValue; + private String operation; + private String table_name; + private String primary_key; + private Object miscObjects; + public String getFull_table() { + return full_table; + } + public void setFull_table(String full_table) { + this.full_table = full_table; + } + public String getKeyspace() { + return keyspace; + } + public void setKeyspace(String keyspace) { + this.keyspace = keyspace; + } + public String getOperation() { + return operation; + } + public void setOperation(String operation) { + this.operation = operation; + } + public String getTable_name() { + return table_name; + } + public void setTable_name(String table_name) { + this.table_name = table_name; + } + public String getPrimary_key() { + return primary_key; + } + public void setPrimary_key(String primary_key) { + this.primary_key = primary_key; + } + public Object getMiscObjects() { + return miscObjects; + } + public void setMiscObjects(Object miscObjects) { + this.miscObjects = miscObjects; + } + public void setChangeValue(Map<String, String> changeValue) { + this.changeValue = changeValue; + } + public Map<String, String> getChangeValue() { + return changeValue; + } + + +} diff --git a/src/main/java/org/onap/music/datastore/jsonobjects/JsonCallback.java b/src/main/java/org/onap/music/datastore/jsonobjects/JsonCallback.java index 42b12f10..4a865320 100644..100755 --- a/src/main/java/org/onap/music/datastore/jsonobjects/JsonCallback.java +++ b/src/main/java/org/onap/music/datastore/jsonobjects/JsonCallback.java @@ -22,6 +22,7 @@ package org.onap.music.datastore.jsonobjects; import java.io.Serializable; +import java.util.Map; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @@ -39,6 +40,17 @@ public class JsonCallback implements Serializable { private String notifyWhenChangeIn; private String notifyWhenInsertsIn; private String notifyWhenDeletesIn; + private Map<String, String> responseBody; + + private String uuid; + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } @ApiModelProperty(value = "application name") public String getApplicationName() { @@ -112,4 +124,12 @@ public class JsonCallback implements Serializable { this.notifyWhenDeletesIn = notifyWhenDeletesIn; } + public Map<String, String> getResponseBody() { + return responseBody; + } + + public void setResponseBody(Map<String, String> responseBody) { + this.responseBody = responseBody; + } + } diff --git a/src/main/java/org/onap/music/datastore/jsonobjects/JsonNotification.java b/src/main/java/org/onap/music/datastore/jsonobjects/JsonNotification.java new file mode 100755 index 00000000..ad999190 --- /dev/null +++ b/src/main/java/org/onap/music/datastore/jsonobjects/JsonNotification.java @@ -0,0 +1,115 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.datastore.jsonobjects; + +import java.io.Serializable; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +@ApiModel(value = "JsonNotification", description = "Json model for callback") +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(Include.NON_NULL) +public class JsonNotification implements Serializable { + + private String notify_field; + private String endpoint; + private String username; + private String password; + private String notify_change; + private String notify_insert; + private String notify_delete; + private String operation_type; + + private Map<String, String> response_body; + + public String getNotify_field() { + return notify_field; + } + public void setNotify_field(String notify_field) { + this.notify_field = notify_field; + } + public String getEndpoint() { + return endpoint; + } + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + public String getUsername() { + return username; + } + public void setUsername(String username) { + this.username = username; + } + public String getPassword() { + return password; + } + public void setPassword(String password) { + this.password = password; + } + public Map<String, String> getResponse_body() { + return response_body; + } + public void setResponse_body(Map<String, String> response_body) { + this.response_body = response_body; + } + public String getNotify_change() { + return notify_change; + } + public void setNotify_change(String notify_change) { + this.notify_change = notify_change; + } + public String getNotify_insert() { + return notify_insert; + } + public void setNotify_insert(String notify_insert) { + this.notify_insert = notify_insert; + } + public String getNotify_delete() { + return notify_delete; + } + public void setNotify_delete(String notify_delete) { + this.notify_delete = notify_delete; + } + public String getOperation_type() { + return operation_type; + } + public void setOperation_type(String operation_type) { + this.operation_type = operation_type; + } + + @Override + public String toString() { + try { + return new com.fasterxml.jackson.databind.ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + } catch (com.fasterxml.jackson.core.JsonProcessingException e) { + return notify_field+ " : "+endpoint+ " : "+username+ " : "+password+ " : "+response_body; + } + + } + +} diff --git a/src/main/java/org/onap/music/lockingservice/MusicLockingService.java b/src/main/java/org/onap/music/lockingservice/MusicLockingService.java index ae026903..f9c09338 100644 --- a/src/main/java/org/onap/music/lockingservice/MusicLockingService.java +++ b/src/main/java/org/onap/music/lockingservice/MusicLockingService.java @@ -33,6 +33,7 @@ import org.onap.music.eelf.logging.format.ErrorSeverity; import org.onap.music.eelf.logging.format.ErrorTypes; import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.main.CachingUtil; import org.onap.music.main.MusicUtil; @@ -131,6 +132,7 @@ public class MusicLockingService implements Watcher { public void unlockAndDeleteId(String lockIdWithDollar) throws KeeperException.NoNodeException { String lockId = lockIdWithDollar.replace('$', '/'); zkLockHandle.unlock(lockId); + CachingUtil.deleteKeysFromDB("'"+lockId+"'"); } public void deleteLock(String lockName) throws MusicLockingException { diff --git a/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java b/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java index 38c873af..5f9c07ee 100644 --- a/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java +++ b/src/main/java/org/onap/music/lockingservice/ZkStatelessLockService.java @@ -301,7 +301,6 @@ public class ZkStatelessLockService extends ProtocolSupport { e1.printStackTrace(); } Long ctime = stat.getCtime(); - System.out.println("Created id ....####"+ctime+"##.......id...:"+id); MusicUtil.zkNodeMap.put(id, ctime); PreparedQueryObject pQuery = new PreparedQueryObject(); pQuery.appendQueryString( diff --git a/src/main/java/org/onap/music/main/CachingUtil.java b/src/main/java/org/onap/music/main/CachingUtil.java index d3654118..83b5158f 100755 --- a/src/main/java/org/onap/music/main/CachingUtil.java +++ b/src/main/java/org/onap/music/main/CachingUtil.java @@ -41,7 +41,8 @@ import org.onap.music.eelf.logging.format.AppMessages; import org.onap.music.eelf.logging.format.ErrorSeverity; import org.onap.music.eelf.logging.format.ErrorTypes; import org.onap.music.exceptions.MusicServiceException; - +import org.onap.music.datastore.jsonobjects.JsonNotification; +import org.onap.music.datastore.jsonobjects.JsonCallback; import com.att.eelf.configuration.EELFLogger; import com.datastax.driver.core.DataType; import com.datastax.driver.core.ResultSet; @@ -65,6 +66,7 @@ public class CachingUtil implements Runnable { private static CacheAccess<String, Map<String, String>> aafCache = JCS.getInstance("aafCache"); private static CacheAccess<String, String> appNameCache = JCS.getInstance("appNameCache"); private static CacheAccess<String, Map<String, String>> musicValidateCache = JCS.getInstance("musicValidateCache"); + private static CacheAccess<String, JsonCallback> callBackCache = JCS.getInstance("callBackCache"); private static Map<String, Number> userAttempts = new HashMap<>(); private static Map<String, Calendar> lastFailedTime = new HashMap<>(); @@ -73,6 +75,14 @@ public class CachingUtil implements Runnable { return true; return false; } + + public static void updateCallBackCache(String appName, JsonCallback jsonCallBack) { + callBackCache.put(appName, jsonCallBack); + } + + public static JsonCallback getCallBackCache(String appName) { + return callBackCache.get(appName); + } public void initializeMusicCache() { logger.info(EELFLoggerDelegate.applicationLogger,"Initializing Music Cache..."); @@ -102,8 +112,8 @@ public class CachingUtil implements Runnable { String keySpace = row.getString("application_name"); try { userAttempts.put(nameSpace, 0); - AAFResponse responseObj = triggerAAF(nameSpace, userId, password); - if (responseObj.getNs().size() > 0) { + boolean responseObj = triggerAAF(nameSpace, userId, password); + if (responseObj) { map = new HashMap<>(); map.put(userId, password); aafCache.put(nameSpace, map); @@ -164,8 +174,8 @@ public class CachingUtil implements Runnable { } } - AAFResponse responseObj = triggerAAF(nameSpace, userId, password); - if (responseObj.getNs().size() > 0) { + boolean responseObj = triggerAAF(nameSpace, userId, password); + if (responseObj) { //if (responseObj.getNs().get(0).getAdmin().contains(userId)) { //Map<String, String> map = new HashMap<>(); //map.put(userId, password); @@ -177,7 +187,7 @@ public class CachingUtil implements Runnable { return false; } - private static AAFResponse triggerAAF(String nameSpace, String userId, String password) + private static boolean triggerAAF(String nameSpace, String userId, String password) throws Exception { if (MusicUtil.getAafEndpointUrl() == null) { logger.error(EELFLoggerDelegate.errorLogger,"",AppMessages.UNKNOWNERROR,ErrorSeverity.WARN, ErrorTypes.GENERALSERVICEERROR); @@ -210,14 +220,14 @@ public class CachingUtil implements Runnable { // TODO Allow for 2-3 times and forbid any attempt to trigger AAF with invalid values // for specific time. } - response.getHeaders().put(HttpHeaders.CONTENT_TYPE, + /*response.getHeaders().put(HttpHeaders.CONTENT_TYPE, Arrays.asList(MediaType.APPLICATION_JSON)); // AAFResponse output = response.getEntity(AAFResponse.class); response.bufferEntity(); String x = response.getEntity(String.class); - AAFResponse responseObj = new ObjectMapper().readValue(x, AAFResponse.class); + AAFResponse responseObj = new ObjectMapper().readValue(x, AAFResponse.class);*/ - return responseObj; + return true; } public static void updateMusicCache(String keyspace, String nameSpace) { @@ -417,4 +427,15 @@ public class CachingUtil implements Runnable { CachingUtil.updateMusicValidateCache(nameSpace, userId, pwd); return resultMap; } + + public static void deleteKeysFromDB(String deleteKeys) { + PreparedQueryObject pQuery = new PreparedQueryObject(); + pQuery.appendQueryString( + "DELETE FROM admin.locks WHERE lock_id IN ("+deleteKeys+")"); + try { + MusicCore.nonKeyRelatedPut(pQuery, "eventual"); + } catch (Exception e) { + e.printStackTrace(); + } + } } diff --git a/src/main/java/org/onap/music/main/CronJobManager.java b/src/main/java/org/onap/music/main/CronJobManager.java index 5b7a8de4..0344c4a1 100644 --- a/src/main/java/org/onap/music/main/CronJobManager.java +++ b/src/main/java/org/onap/music/main/CronJobManager.java @@ -32,16 +32,18 @@ import javax.servlet.ServletContextListener; import javax.servlet.annotation.WebListener; import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicServiceException; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; -@WebListener +//@WebListener public class CronJobManager implements ServletContextListener { private ScheduledExecutorService scheduler; + private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CronJobManager.class); @Override public void contextInitialized(ServletContextEvent event) { @@ -55,8 +57,23 @@ public class CronJobManager implements ServletContextListener { } catch (MusicServiceException e1) { e1.printStackTrace(); } - - pQuery = new PreparedQueryObject(); + + //Zookeeper cleanup + scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + deleteLocksFromDB(); + } + } , 0, 24, TimeUnit.HOURS); + } + + @Override + public void contextDestroyed(ServletContextEvent event) { + scheduler.shutdownNow(); + } + + public void deleteLocksFromDB() { + PreparedQueryObject pQuery = new PreparedQueryObject(); pQuery.appendQueryString( "select * from admin.locks"); try { @@ -71,68 +88,22 @@ public class CronJobManager implements ServletContextListener { if(System.currentTimeMillis() >= ctime + 24 * 60 * 60 * 1000) { expiredKeys = true; String new_id = id.substring(1); - MusicCore.deleteLock(new_id); - deleteKeys.append(id).append(","); + try { + MusicCore.deleteLock(new_id); + } catch (MusicLockingException e) { + logger.info(EELFLoggerDelegate.applicationLogger, + e.getMessage()); + } + deleteKeys.append("'").append(id).append("'").append(","); } - else { - MusicUtil.zkNodeMap.put(id, ctime); - } - }; + } if(expiredKeys) { deleteKeys.deleteCharAt(deleteKeys.length()-1); - deleteKeysFromDB(deleteKeys); + CachingUtil.deleteKeysFromDB(deleteKeys.toString()); } } catch (MusicServiceException e) { e.printStackTrace(); - } catch (MusicLockingException e) { - e.printStackTrace(); - } - - //Zookeeper cleanup - scheduler.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - Iterator<Entry<String, Long>> it = MusicUtil.zkNodeMap.entrySet().iterator(); - StringBuilder deleteKeys = new StringBuilder(); - Boolean expiredKeys = false; - while (it.hasNext()) { - Map.Entry<String, Long> pair = (Map.Entry<String, Long>)it.next(); - long ctime = pair.getValue(); - if (System.currentTimeMillis() >= ctime + 24 * 60 * 60 * 1000) { - try { - expiredKeys = true; - String id = pair.getKey(); - deleteKeys.append("'").append(id).append("'").append(","); - MusicCore.deleteLock(id.substring(1)); - MusicUtil.zkNodeMap.remove(id); - - } catch (MusicLockingException e) { - e.printStackTrace(); - } - } - } - if(expiredKeys) { - deleteKeys.deleteCharAt(deleteKeys.length()-1); - deleteKeysFromDB(deleteKeys); - } } - } , 0, 24, TimeUnit.HOURS); - } - - @Override - public void contextDestroyed(ServletContextEvent event) { - scheduler.shutdownNow(); - } - - public void deleteKeysFromDB(StringBuilder deleteKeys) { - PreparedQueryObject pQuery = new PreparedQueryObject(); - pQuery.appendQueryString( - "DELETE FROM admin.locks WHERE lock_id IN ("+deleteKeys+")"); - try { - MusicCore.nonKeyRelatedPut(pQuery, "eventual"); - } catch (Exception e) { - e.printStackTrace(); - } } } diff --git a/src/main/java/org/onap/music/main/MusicCore.java b/src/main/java/org/onap/music/main/MusicCore.java index dfc93ccc..b729ba74 100644 --- a/src/main/java/org/onap/music/main/MusicCore.java +++ b/src/main/java/org/onap/music/main/MusicCore.java @@ -708,6 +708,22 @@ public class MusicCore { } return results; } + + public static String getMyHostId() { + PreparedQueryObject pQuery = new PreparedQueryObject(); + pQuery.appendQueryString("SELECT HOST_ID FROM SYSTEM.LOCAL"); + ResultSet rs = null; + try { + rs = getDSHandle().executeEventualGet(pQuery); + Row row = rs.one(); + return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString(); + } catch (Exception e) { + e.printStackTrace(); + logger.error(EELFLoggerDelegate.errorLogger,e.getMessage()); + } + logger.error(EELFLoggerDelegate.errorLogger, "Some issue during MusicCore.getMyHostId"); + return "UNKNOW"; + } /** * This method performs DDL operations on cassandra, if the the resource is available. Lock ID diff --git a/src/main/java/org/onap/music/main/MusicUtil.java b/src/main/java/org/onap/music/main/MusicUtil.java index 2dd2f231..40d19766 100755 --- a/src/main/java/org/onap/music/main/MusicUtil.java +++ b/src/main/java/org/onap/music/main/MusicUtil.java @@ -86,6 +86,8 @@ public class MusicUtil { private static String myCassaHost = LOCALHOST; private static String defaultMusicIp = LOCALHOST; private static int cassandraPort = 9042; + private static int notifytimeout = 30000; + private static int notifyinterval = 5000; private static boolean debug = true; private static String version = "2.3.0"; @@ -94,7 +96,7 @@ public class MusicUtil { private static long defaultLockLeasePeriod = 6000; private static final String[] propKeys = new String[] { "zookeeper.host", "cassandra.host", "music.ip", "debug", "version", "music.rest.ip", "music.properties", "lock.lease.period", "id", "all.ids", "public.ip", - "all.pubic.ips", "cassandra.user", "cassandra.password", "aaf.endpoint.url","cassandra.port" }; + "all.pubic.ips", "cassandra.user", "cassandra.password", "aaf.endpoint.url","cassandra.port", "notify.timeout", "notify.interval" }; private static String cassName = "cassandra"; private static String cassPwd; @@ -608,7 +610,24 @@ public class MusicUtil { MusicUtil.setCassName(prop.getProperty("cassandra.user")); MusicUtil.setCassPwd(prop.getProperty("cassandra.password")); MusicUtil.setCassandraPort(Integer.parseInt(prop.getProperty("cassandra.port"))); + MusicUtil.setNotifyTimeOut(Integer.parseInt(prop.getProperty("notify.timeout"))); + MusicUtil.setNotifyInterval(Integer.parseInt(prop.getProperty("notify.interval"))); } + + private static void setNotifyInterval(int notifyinterval) { + MusicUtil.notifyinterval = notifyinterval; + } + private static void setNotifyTimeOut(int notifytimeout) { + MusicUtil.notifytimeout = notifytimeout; + } + public static int getNotifyInterval() { + return MusicUtil.notifyinterval; + } + + public static int getNotifyTimeout() { + return MusicUtil.notifytimeout; + + } } diff --git a/src/main/java/org/onap/music/rest/RestMusicAdminAPI.java b/src/main/java/org/onap/music/rest/RestMusicAdminAPI.java index 71570b6c..e930e48b 100755 --- a/src/main/java/org/onap/music/rest/RestMusicAdminAPI.java +++ b/src/main/java/org/onap/music/rest/RestMusicAdminAPI.java @@ -22,28 +22,32 @@ package org.onap.music.rest; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; + import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; -import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.Produces; -import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.Status; +import org.codehaus.jackson.map.ObjectMapper; import org.mindrot.jbcrypt.BCrypt; import org.onap.music.datastore.PreparedQueryObject; +import org.onap.music.datastore.jsonobjects.JSONCallbackResponse; import org.onap.music.datastore.jsonobjects.JSONObject; import org.onap.music.datastore.jsonobjects.JsonCallback; +import org.onap.music.datastore.jsonobjects.JsonNotification; import org.onap.music.datastore.jsonobjects.JsonOnboard; import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.eelf.logging.format.AppMessages; @@ -54,6 +58,7 @@ import org.onap.music.main.CachingUtil; import org.onap.music.main.MusicCore; import org.onap.music.main.MusicUtil; import org.onap.music.main.ResultType; +import org.onap.music.main.ReturnType; import org.onap.music.response.jsonobjects.JsonResponse; import com.datastax.driver.core.DataType; @@ -67,8 +72,15 @@ import com.sun.jersey.core.util.Base64; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; -import org.apache.commons.jcs.JCS; -import org.apache.commons.jcs.access.CacheAccess; +import com.datastax.driver.core.TableMetadata; + +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ColumnDefinitions.Definition; +import com.datastax.driver.core.TableMetadata; +import java.util.Base64.Encoder; +import java.util.Base64.Decoder; @Path("/v2/admin") // @Path("/v{version: [0-9]+}/admin") @@ -123,7 +135,7 @@ public class RestMusicAdminAPI { MusicUtil.DEFAULTKEYSPACENAME)); pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), appName)); pQuery.addValue(MusicUtil.convertToActualDataType(DataType.cboolean(), "True")); - pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), BCrypt.hashpw(password, BCrypt.gensalt()))); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), password)); pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), userId)); pQuery.addValue(MusicUtil.convertToActualDataType(DataType.cboolean(), isAAF)); @@ -172,9 +184,8 @@ public class RestMusicAdminAPI { if (cql.endsWith("AND ")) cql = cql.trim().substring(0, cql.length() - 4); - System.out.println("Query is: " + cql); + logger.info("Query in callback is: " + cql); cql = cql + " allow filtering"; - System.out.println("Get OnboardingInfo CQL: " + cql); pQuery.appendQueryString(cql); if (appName != null) pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), appName)); @@ -390,98 +401,336 @@ public class RestMusicAdminAPI { @Path("/callbackOps") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) - public String callbackOps(JSONObject inputJsonObj) { - // trigger response {"full_table":"admin.race_winners","keyspace":"admin","name":"Siri","operation":"update","table_name":"race_winner","primary_key":"1"} - try { - logger.info("Got notification: " + inputJsonObj.getData()); - String dataStr = inputJsonObj.getData(); - String[] dataStrArr = dataStr.substring(1, dataStr.length() - 1).split(","); - - for (String key : dataStrArr) { - if (key.contains("full_table")) { - String tableName = key.split(":")[1].substring(1, key.split(":")[1].length() - 1); - PreparedQueryObject pQuery = new PreparedQueryObject(); - pQuery.appendQueryString( - "select endpoint, username, password from admin.callback_api where changes = ? allow filtering"); - pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), tableName)); - ResultSet rs = MusicCore.get(pQuery); - Row row = rs.all().get(0); - if(row != null) { - String endpoint = row.getString("endpoint"); - String username = row.getString("username"); - String password = row.getString("password"); - logger.info("Notifying the changes to endpoint: "+endpoint); - WebResource webResource = client.resource(endpoint); - String authData = username+":"+password; - byte[] plainCredsBytes = authData.getBytes(); - byte[] base64CredsBytes = Base64.encode(plainCredsBytes); - String base64Creds = new String(base64CredsBytes); - ClientResponse response = webResource.header("Authorization", "Basic " + base64Creds).accept("application/json") - .post(ClientResponse.class, inputJsonObj); - if(response.getStatus() != 200){ - logger.error("Exception while notifying"); - } - } - break; + public Response callbackOps(final JSONObject inputJsonObj) { + // {"keyspace":"conductor","full_table":"conductor.plans","changeValue":{"conductor.plans.status":"Who??","position":"3"},"operation":"update","table_name":"plans","primary_key":"3"} + Map<String, Object> resultMap = new HashMap<>(); + new Thread(new Runnable() { + public void run() { + makeAsyncCall(inputJsonObj); + } + }).start(); + + return Response.status(Status.OK).entity(resultMap).build(); + } + + private Response makeAsyncCall(JSONObject inputJsonObj) { + + Map<String, Object> resultMap = new HashMap<>(); + try { + logger.info("Got notification: " + inputJsonObj.getData()); + String dataStr = inputJsonObj.getData(); + ObjectMapper mapper = new ObjectMapper(); + JSONCallbackResponse jsonResponse = mapper.readValue(dataStr, JSONCallbackResponse.class); + String operation = jsonResponse.getOperation(); + Map<String, String> changeValueMap = jsonResponse.getChangeValue(); + String primaryKey = jsonResponse.getPrimary_key(); + //String full_table = jsonResponse.getFull_table(); //conductor.plans + String field_value = changeValueMap.get("field_value"); + if(field_value == null) + field_value = jsonResponse.getFull_table(); + //Get from Cache + JsonCallback baseRequestObj = CachingUtil.getCallBackCache(field_value); + //baseRequestObj = null; + + if(baseRequestObj == null) { + logger.info("Is cache empty? reconstructing Object from cache.."); + baseRequestObj = constructJsonCallbackFromCache(field_value); } - } - } catch(Exception e) { + if(baseRequestObj == null) { + resultMap.put("Exception", + "Oops. Something went wrong. Please make sure Callback properties are onboarded."); + logger.error(EELFLoggerDelegate.errorLogger, "", AppMessages.INCORRECTDATA, + ErrorSeverity.CRITICAL, ErrorTypes.DATAERROR); + return Response.status(Status.BAD_REQUEST).entity(resultMap).build(); + } + + String key = "admin" + "." + "notification_master" + "." + baseRequestObj.getUuid(); + String lockId = MusicCore.createLockReference(key); + long lockCreationTime = System.currentTimeMillis(); + ReturnType lockAcqResult = MusicCore.acquireLock(key, lockId); + if(! lockAcqResult.getResult().toString().equals("SUCCESS")) { + logger.info("Some other node is notifying the caller.."); + } + + logger.info(operation+ ": Operation :: changeValue: "+changeValueMap); + if(operation.equals("update")) { + String notifyWhenChangeIn = baseRequestObj.getNotifyWhenChangeIn(); // conductor.plans.status + logger.info("**********notifyWhenChangeIn: "+notifyWhenChangeIn); + if(field_value.equals(notifyWhenChangeIn)) { + logger.info("********** notifyting the endpoint: "+baseRequestObj.getApplicationNotificationEndpoint()); + notifyCallBackAppl(jsonResponse, baseRequestObj); + } + + } else if(operation.equals("delete")) { + String notifyWhenDeletesIn = baseRequestObj.getNotifyWhenDeletesIn(); // conductor.plans.status + logger.info("**********notifyWhenDeletesIn: "+notifyWhenDeletesIn); + if(field_value.equals(notifyWhenDeletesIn)) { + logger.info("********** notifyting the endpoint: "+baseRequestObj.getApplicationNotificationEndpoint()); + notifyCallBackAppl(jsonResponse, baseRequestObj); + } + } else if(operation.equals("insert")) { + String notifyWhenInsertsIn = baseRequestObj.getNotifyWhenInsertsIn(); // conductor.plans.status + logger.info("**********notifyWhenInsertsIn: "+notifyWhenInsertsIn); + if(field_value.equals(notifyWhenInsertsIn)) { + logger.info("********** notifyting the endpoint: "+baseRequestObj.getApplicationNotificationEndpoint()); + notifyCallBackAppl(jsonResponse, baseRequestObj); + } + } + MusicCore.releaseLock(lockId, true); + } catch(Exception e) { e.printStackTrace(); logger.info("Exception..."); } - return "Success"; - } + logger.info(">>> callback is completed. Notification was sent from Music..."); + return Response.status(Status.OK).entity(resultMap).build(); + } + + private void notifyCallBackAppl(JSONCallbackResponse jsonResponse, JsonCallback baseRequestObj) { + int notifytimeout = MusicUtil.getNotifyTimeout(); + int notifyinterval = MusicUtil.getNotifyInterval(); + String endpoint = baseRequestObj.getApplicationNotificationEndpoint(); + String username = baseRequestObj.getApplicationUsername(); + String password = baseRequestObj.getApplicationPassword(); + JsonNotification jsonNotification = constructJsonNotification(jsonResponse, baseRequestObj); + jsonNotification.setOperation_type(jsonResponse.getOperation()); + logger.info("Response sent is: "+jsonNotification); + WebResource webResource = client.resource(endpoint); + String authData = username+":"+password; + byte[] plainCredsBytes = authData.getBytes(); + byte[] base64CredsBytes = Base64.encode(plainCredsBytes); + String base64Creds = new String(base64CredsBytes); + Map<String, String> response_body = baseRequestObj.getResponseBody(); + ClientResponse response = null; + try { + response = webResource.header("Authorization", "Basic " + base64Creds).accept("application/json").type("application/json") + .post(ClientResponse.class, jsonNotification); + } catch (com.sun.jersey.api.client.ClientHandlerException chf) { + boolean ok = false; + logger.info("Is Service down?"); + long now= System.currentTimeMillis(); + long end = now+notifytimeout; + while(! ok) { + logger.info("retrying since error in notifying callback.."); + try { + response = webResource.header("Authorization", "Basic " + base64Creds).accept("application/json").type("application/json") + .post(ClientResponse.class, jsonNotification); + if(response.getStatus() == 200) ok = true; + }catch (Exception e) { + System.err.println("Is response null: "+response==null); + System.err.println("Retry until "+(end-System.currentTimeMillis())); + if(response == null && System.currentTimeMillis() < end) ok = false; + else ok = true; + try{ Thread.sleep(notifyinterval); } catch(Exception e1) {} + } + } + } + response.bufferEntity(); + String responseStr = response.getEntity(String.class); + logger.info(">>>>> Response from Notified client: "+responseStr); + + if(response.getStatus() != 200){ + long now= System.currentTimeMillis(); + long end = now+30000; + while(response.getStatus() != 200 && System.currentTimeMillis() < end) { + logger.info("retrying since error in notifying callback.."); + response = webResource.header("Authorization", "Basic " + base64Creds).accept("application/json").type("application/json") + .post(ClientResponse.class, jsonNotification); + } + logger.error("Exception while notifying.. "+response.getStatus()); + } + } + + private JsonNotification constructJsonNotification(JSONCallbackResponse jsonResponse, JsonCallback baseRequestObj) { + + JsonNotification jsonNotification = new JsonNotification(); + try { + jsonNotification.setNotify_field(baseRequestObj.getNotifyOn()); + jsonNotification.setEndpoint(baseRequestObj.getApplicationNotificationEndpoint()); + jsonNotification.setUsername(baseRequestObj.getApplicationUsername()); + jsonNotification.setPassword(baseRequestObj.getApplicationPassword()); + String pkValue = jsonResponse.getPrimary_key(); + + String[] fullNotifyArr = baseRequestObj.getNotifyOn().split(":"); + + String[] tableArr = fullNotifyArr[0].split("\\."); + TableMetadata tableInfo = MusicCore.returnColumnMetadata(tableArr[0], tableArr[1]); + DataType primaryIdType = tableInfo.getPrimaryKey().get(0).getType(); + String primaryId = tableInfo.getPrimaryKey().get(0).getName(); + logger.info(">>>>>>> Primary Id: "+primaryId); + + Map<String, String> responseBodyMap = baseRequestObj.getResponseBody(); + Set<String> keySet = responseBodyMap.keySet(); + /*for (Map.Entry<String, Object> entry : valuesMap.entrySet()) { + DataType colType = null; + try { + colType = tableInfo.getColumn(entry.getKey()).getType(); + } catch(NullPointerException ex) { + + } + }*/ + + + String cql = "select "; + for(String keys: keySet) { + cql = cql + keys + ","; + } + cql = cql.substring(0, cql.length()-1); + cql = cql + " FROM "+fullNotifyArr[0]+" WHERE "+primaryId+" = ?"; + logger.info("CQL in constructJsonNotification: "+cql); + PreparedQueryObject pQuery = new PreparedQueryObject(); + pQuery.appendQueryString(cql); + pQuery.addValue(MusicUtil.convertToActualDataType(primaryIdType, pkValue)); + Row row = MusicCore.get(pQuery).one(); + Map<String, String> newMap = new HashMap<>(); + if(row != null) { + for(String keys: keySet) { + String value = null; + logger.info("responseBodyMap: "+responseBodyMap.toString()); + logger.info(">>>>>>>> converting <<<<<<<<<<<< "+keys + " : "+responseBodyMap.get(keys)+ ":" +responseBodyMap.get(keys).equals("uuid")); + if(responseBodyMap.get(keys).equals("uuid")) + value = row.getUUID(keys).toString(); + else if (responseBodyMap.get(keys).equals("text")) + value = row.getString(keys); + /*else if (responseBodyMap.get(keys).contains("int")) + value = row.getLong(keys).toString();*/ + newMap.put(keys, value); + } + } + if("delete".equals(jsonResponse.getOperation())) { + newMap.put(primaryId, pkValue); + } + jsonNotification.setResponse_body(newMap); + } catch(Exception e) { + e.printStackTrace(); + } + return jsonNotification; + } + + private JsonCallback constructJsonCallbackFromCache(String fullTable) throws Exception{ + PreparedQueryObject pQuery = new PreparedQueryObject(); + JsonCallback jsonCallback = new JsonCallback(); + String cql = + "select id, endpoint_userid, endpoint_password, notify_to_endpoint, notify_insert_on," + + " notify_delete_on, notify_update_on, request from admin.notification_master where notifyon = ? allow filtering"; + pQuery.appendQueryString(cql); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), fullTable)); + logger.info("Query: "+pQuery.getQuery() + " : "+fullTable); + + Row row = MusicCore.get(pQuery).one(); + if(row != null) { + String endpoint = row.getString("notify_to_endpoint"); + String username = row.getString("endpoint_userid"); + String password = row.getString("endpoint_password"); + String insert = row.getString("notify_insert_on"); + String delete = row.getString("notify_delete_on"); + String update = row.getString("notify_update_on"); + String request = row.getString("request"); + String uuid = row.getUUID("id").toString(); + jsonCallback.setApplicationNotificationEndpoint(endpoint); + jsonCallback.setApplicationPassword(password); + jsonCallback.setApplicationUsername(username); + jsonCallback.setNotifyOn(fullTable); + jsonCallback.setNotifyWhenInsertsIn(insert); + jsonCallback.setNotifyWhenDeletesIn(delete); + jsonCallback.setNotifyWhenChangeIn(update); + jsonCallback.setUuid(uuid); + logger.info("From DB. Saved request_body: "+request); + request = request.substring(1, request.length()-1); + String[] keyValuePairs = request.split(","); + Map<String,String> responseBody = new HashMap<>(); + + for(String pair : keyValuePairs) { + String[] entry = pair.split("="); + String val = ""; + if(entry.length == 2) + val = entry[1]; + responseBody.put(entry[0], val); + } + logger.info("After parsing. Saved request_body: "+responseBody); + jsonCallback.setResponseBody(responseBody); + } + return jsonCallback; + } @POST - @Path("/addCallback") + @Path("/onboardCallback") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) - public Response addCallback(JsonCallback jsonCallback) throws Exception { + public Response addCallback(JsonNotification jsonNotification) { Map<String, Object> resultMap = new HashMap<>(); ResponseBuilder response = Response.noContent().header("X-latestVersion", MusicUtil.getVersion()); - String username = jsonCallback.getApplicationUsername(); - String password = jsonCallback.getApplicationPassword(); - String endpoint = jsonCallback.getApplicationNotificationEndpoint(); - String changes = jsonCallback.getNotifyWhenChangeIn(); - String inserts = jsonCallback.getNotifyWhenInsertsIn(); - String deletes = jsonCallback.getNotifyWhenDeletesIn(); + String username = jsonNotification.getUsername(); + String password = jsonNotification.getPassword(); + String endpoint = jsonNotification.getEndpoint(); + String notify_field = jsonNotification.getNotify_field(); + Map<String, String> responseBody = jsonNotification.getResponse_body(); + + String[] allFields = notify_field.split(":"); + String inserts = null; + String updates = null; + String deletes = null; + if(allFields.length >= 2) { + inserts = updates = notify_field; + } else if(allFields.length == 1) { + inserts = deletes = notify_field;; + } + PreparedQueryObject pQuery = new PreparedQueryObject(); - if (username == null || password == null || endpoint == null || changes == null || inserts == null || deletes == null) { + /*if (username == null || password == null || endpoint == null) { logger.error(EELFLoggerDelegate.errorLogger, "", AppMessages.MISSINGINFO, ErrorSeverity.CRITICAL, ErrorTypes.DATAERROR); resultMap.put("Exception", "Please check the request parameters. Some of the required values are missing."); return Response.status(Status.BAD_REQUEST).entity(resultMap).build(); - } + }*/ String uuid = CachingUtil.generateUUID(); try { - pQuery.appendQueryString( - "INSERT INTO admin.callback_api (uuid, username, password, endpoint, " - + "changes, inserts, deletes) VALUES (?,?,?,?,?,?,?)"); - pQuery.addValue(MusicUtil.convertToActualDataType(DataType.uuid(), uuid)); - pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), username)); - pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), password)); - pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), endpoint)); - pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), changes)); - pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), inserts)); - pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), deletes)); - MusicCore.eventualPut(pQuery); - - Map<String, String> jsonMap = new HashMap<>(); - jsonMap.put("username", username); - jsonMap.put("password", password); - jsonMap.put("endpoint", endpoint); - jsonMap.put("changes", changes); - jsonMap.put("inserts", inserts); - jsonMap.put("deletes", deletes); - + pQuery.appendQueryString( + "INSERT INTO admin.notification_master (id, endpoint_userid, endpoint_password, notify_to_endpoint, " + + "notifyon, notify_insert_on, notify_delete_on, notify_update_on, request, current_notifier) VALUES (?,?,?,?,?,?,?,?,?,?)"); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.uuid(), uuid)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), username)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), password)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), endpoint)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), notify_field)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), inserts)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), deletes)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), updates)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), responseBody)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), "UNKNOWN")); + MusicCore.nonKeyRelatedPut(pQuery, MusicUtil.EVENTUAL); + JsonCallback jsonCallback = new JsonCallback(); + jsonCallback.setUuid(uuid); + jsonCallback.setApplicationNotificationEndpoint(endpoint); + jsonCallback.setApplicationPassword(password); + jsonCallback.setApplicationUsername(username); + jsonCallback.setNotifyOn(notify_field); + jsonCallback.setNotifyWhenChangeIn(updates); + jsonCallback.setNotifyWhenDeletesIn(deletes); + jsonCallback.setNotifyWhenInsertsIn(inserts); + jsonCallback.setResponseBody(responseBody); + CachingUtil.updateCallBackCache(notify_field, jsonCallback); + logger.info("Cache updated "); //callBackCache.put(jsonCallback.getApplicationName(), jsonMap); } catch (InvalidQueryException e) { logger.error(EELFLoggerDelegate.errorLogger,"Exception callback_api table not configured."+e.getMessage()); - resultMap.put("Exception", "Please make sure admin.callback_api table is configured."); + resultMap.put("Exception", "Please make sure admin.notification_master table is configured."); + return Response.status(Status.BAD_REQUEST).entity(resultMap).build(); + } catch(Exception e) { + e.printStackTrace(); + resultMap.put("Exception", "Exception Occured."); return Response.status(Status.BAD_REQUEST).entity(resultMap).build(); } return response.status(Status.OK).entity(new JsonResponse(ResultType.SUCCESS).setMessage("Callback api successfully registered").toMap()).build(); } + + /*public String encodePwd(String password) { + return Base64.getEncoder().encodeToString(password.getBytes()); + } + + public String decodePwd(String password) { + byte[] bytes = Base64.getDecoder().decode(password); + return new String(bytes); + }*/ } diff --git a/src/main/java/org/onap/music/rest/RestMusicDataAPI.java b/src/main/java/org/onap/music/rest/RestMusicDataAPI.java index 30656350..f0c1663c 100755 --- a/src/main/java/org/onap/music/rest/RestMusicDataAPI.java +++ b/src/main/java/org/onap/music/rest/RestMusicDataAPI.java @@ -1380,7 +1380,7 @@ public class RestMusicDataAPI { if(results.getAvailableWithoutFetching() >0) { return response.status(Status.OK).entity(new JsonResponse(ResultType.SUCCESS).setDataResult(MusicCore.marshallResults(results)).toMap()).build(); } - return response.status(Status.OK).entity(new JsonResponse(ResultType.SUCCESS).setError("No data found").toMap()).build(); + return response.status(Status.OK).entity(new JsonResponse(ResultType.SUCCESS).setDataResult(MusicCore.marshallResults(results)).setError("No data found").toMap()).build(); } catch (MusicServiceException ex) { logger.error(EELFLoggerDelegate.errorLogger,"", AppMessages.UNKNOWNERROR ,ErrorSeverity.ERROR, ErrorTypes.MUSICSERVICEERROR); return response.status(Status.BAD_REQUEST).entity(new JsonResponse(ResultType.FAILURE).setError(ex.getMessage()).toMap()).build(); |