From 64b04604921670862993fb2a72a895a6209947d5 Mon Sep 17 00:00:00 2001 From: Thomas Nelson Jr Date: Wed, 8 Aug 2018 00:17:33 +0000 Subject: Updates to Callback Api and Connection Change-Id: I6a3f0537a616ae4d54b47fa2c70ba5128e39f123 Issue-ID: MUSIC-92 Signed-off-by: Thomas Nelson Jr --- .../org/onap/music/datastore/MusicDataStore.java | 53 ++++++++-- .../music/datastore/jsonobjects/JsonCallback.java | 115 +++++++++++++++++++++ src/main/java/org/onap/music/main/MusicUtil.java | 56 +++++++++- .../org/onap/music/main/PropertiesListener.java | 3 + .../org/onap/music/rest/RestMusicAdminAPI.java | 114 +++++++++++++++++++- 5 files changed, 325 insertions(+), 16 deletions(-) create mode 100644 src/main/java/org/onap/music/datastore/jsonobjects/JsonCallback.java (limited to 'src/main/java/org/onap') diff --git a/src/main/java/org/onap/music/datastore/MusicDataStore.java b/src/main/java/org/onap/music/datastore/MusicDataStore.java index 563e07f5..7557247d 100644 --- a/src/main/java/org/onap/music/datastore/MusicDataStore.java +++ b/src/main/java/org/onap/music/datastore/MusicDataStore.java @@ -42,8 +42,10 @@ import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.ColumnDefinitions.Definition; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.DataType; +import com.datastax.driver.core.HostDistance; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PoolingOptions; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; @@ -149,16 +151,35 @@ public class MusicDataStore { * clusters. */ private void connectToCassaCluster() { - Iterator it = getAllPossibleLocalIps().iterator(); + Iterator it = getAllPossibleLocalIps().iterator(); String address = "localhost"; + String[] addresses = null; + address = MusicUtil.getMyCassaHost(); + addresses = address.split(","); + logger.info(EELFLoggerDelegate.applicationLogger, "Connecting to cassa cluster: Iterating through possible ips:" + getAllPossibleLocalIps()); + PoolingOptions poolingOptions = new PoolingOptions(); + poolingOptions + .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) + .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); while (it.hasNext()) { try { - cluster = Cluster.builder().withPort(9042) - .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) - .addContactPoint(address).build(); + if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) { + logger.info(EELFLoggerDelegate.applicationLogger, + "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd()); + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .withPoolingOptions(poolingOptions) + .addContactPoints(addresses).build(); + } + else + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .addContactPoints(addresses).build(); + Metadata metadata = cluster.getMetadata(); logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster " + metadata.getClusterName() + " at " + address); @@ -185,9 +206,27 @@ public class MusicDataStore { * @param address */ private void connectToCassaCluster(String address) throws MusicServiceException { - cluster = Cluster.builder().withPort(9042) - .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) - .addContactPoint(address).build(); + String[] addresses = null; + addresses = address.split(","); + PoolingOptions poolingOptions = new PoolingOptions(); + poolingOptions + .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) + .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); + if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) { + logger.info(EELFLoggerDelegate.applicationLogger, + "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd()); + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .withPoolingOptions(poolingOptions) + .addContactPoints(addresses).build(); + } + else { + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .withPoolingOptions(poolingOptions) + .addContactPoints(addresses).build(); + } Metadata metadata = cluster.getMetadata(); logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster " + metadata.getClusterName() + " at " + address); diff --git a/src/main/java/org/onap/music/datastore/jsonobjects/JsonCallback.java b/src/main/java/org/onap/music/datastore/jsonobjects/JsonCallback.java new file mode 100644 index 00000000..42b12f10 --- /dev/null +++ b/src/main/java/org/onap/music/datastore/jsonobjects/JsonCallback.java @@ -0,0 +1,115 @@ +/* + * ============LICENSE_START========================================== + * org.onap.music + * =================================================================== + * Copyright (c) 2017 AT&T Intellectual Property + * =================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ============LICENSE_END============================================= + * ==================================================================== + */ +package org.onap.music.datastore.jsonobjects; + +import java.io.Serializable; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +@ApiModel(value = "JsonCallback", description = "Json model for callback") +@JsonIgnoreProperties(ignoreUnknown = true) +public class JsonCallback implements Serializable { + private String applicationName; + private String applicationUsername; + private String applicationPassword; + private String applicationNotificationEndpoint; + private String notifyOn; + private String notifyWhenChangeIn; + private String notifyWhenInsertsIn; + private String notifyWhenDeletesIn; + + @ApiModelProperty(value = "application name") + public String getApplicationName() { + return applicationName; + } + + public void setApplicationName(String applicationName) { + this.applicationName = applicationName; + } + + @ApiModelProperty(value = "notify On") + public String getNotifyOn() { + return notifyOn; + } + + public void setNotifyOn(String notifyOn) { + this.notifyOn = notifyOn; + } + + @ApiModelProperty(value = "application User name") + public String getApplicationUsername() { + return applicationUsername; + } + + public void setApplicationUsername(String applicationUsername) { + this.applicationUsername = applicationUsername; + } + + @ApiModelProperty(value = "application password") + public String getApplicationPassword() { + return applicationPassword; + } + + public void setApplicationPassword(String applicationPassword) { + this.applicationPassword = applicationPassword; + } + + @ApiModelProperty(value = "application notification endpoint") + public String getApplicationNotificationEndpoint() { + return applicationNotificationEndpoint; + } + + public void setApplicationNotificationEndpoint(String applicationNotificationEndpoint) { + this.applicationNotificationEndpoint = applicationNotificationEndpoint; + } + + @ApiModelProperty(value = "notify when updates") + public String getNotifyWhenChangeIn() { + return notifyWhenChangeIn; + } + + public void setNotifyWhenChangeIn(String notifyWhenChangeIn) { + this.notifyWhenChangeIn = notifyWhenChangeIn; + } + + @ApiModelProperty(value = "notify when inserts") + public String getNotifyWhenInsertsIn() { + return notifyWhenInsertsIn; + } + + public void setNotifyWhenInsertsIn(String notifyWhenInsertsIn) { + this.notifyWhenInsertsIn = notifyWhenInsertsIn; + } + + @ApiModelProperty(value = "notify when deletes") + public String getNotifyWhenDeletesIn() { + return notifyWhenDeletesIn; + } + + public void setNotifyWhenDeletesIn(String notifyWhenDeletesIn) { + this.notifyWhenDeletesIn = notifyWhenDeletesIn; + } + +} diff --git a/src/main/java/org/onap/music/main/MusicUtil.java b/src/main/java/org/onap/music/main/MusicUtil.java index a161fd56..2dd2f231 100755 --- a/src/main/java/org/onap/music/main/MusicUtil.java +++ b/src/main/java/org/onap/music/main/MusicUtil.java @@ -23,12 +23,15 @@ package org.onap.music.main; import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Scanner; import java.util.StringTokenizer; import java.util.UUID; @@ -69,7 +72,8 @@ public class MusicUtil { public static final String UPSERT = "upsert"; public static final String USERID = "userId"; public static final String PASSWORD = "password"; - public static final String AUTHORIZATION = "Authorization"; + + public static final String AUTHORIZATION = "Authorization"; private static final String LOCALHOST = "localhost"; private static final String PROPERTIES_FILE = "/opt/app/music/etc/music.properties"; @@ -81,6 +85,8 @@ public class MusicUtil { private static String myZkHost = LOCALHOST; private static String myCassaHost = LOCALHOST; private static String defaultMusicIp = LOCALHOST; + private static int cassandraPort = 9042; + private static boolean debug = true; private static String version = "2.3.0"; private static String musicRestIp = LOCALHOST; @@ -88,7 +94,7 @@ public class MusicUtil { private static long defaultLockLeasePeriod = 6000; private static final String[] propKeys = new String[] { "zookeeper.host", "cassandra.host", "music.ip", "debug", "version", "music.rest.ip", "music.properties", "lock.lease.period", "id", "all.ids", "public.ip", - "all.pubic.ips", "cassandra.user", "cassandra.password", "aaf.endpoint.url" }; + "all.pubic.ips", "cassandra.user", "cassandra.password", "aaf.endpoint.url","cassandra.port" }; private static String cassName = "cassandra"; private static String cassPwd; @@ -98,8 +104,21 @@ public class MusicUtil { private MusicUtil() { throw new IllegalStateException("Utility Class"); } - - + /** + * + * @return cassandra port + */ + public static int getCassandraPort() { + return cassandraPort; + } + + /** + * set cassandra port + * @param cassandraPort + */ + public static void setCassandraPort(int cassandraPort) { + MusicUtil.cassandraPort = cassandraPort; + } /** * @return the cassName */ @@ -562,5 +581,34 @@ public class MusicUtil { return authValues; } + + public static void loadProperties() throws Exception { + Properties prop = new Properties(); + InputStream input = null; + try { + // load the properties file + input = MusicUtil.class.getClassLoader().getResourceAsStream("music.properties"); + prop.load(input); + } catch (Exception ex) { + logger.error(EELFLoggerDelegate.errorLogger, "Unable to find properties file."); + throw new Exception(); + } finally { + if (input != null) { + try { + input.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + // get the property value and return it + MusicUtil.setMyCassaHost(prop.getProperty("cassandra.host")); + String zkHosts = prop.getProperty("zookeeper.host"); + MusicUtil.setMyZkHost(zkHosts); + MusicUtil.setCassName(prop.getProperty("cassandra.user")); + MusicUtil.setCassPwd(prop.getProperty("cassandra.password")); + MusicUtil.setCassandraPort(Integer.parseInt(prop.getProperty("cassandra.port"))); + + } } diff --git a/src/main/java/org/onap/music/main/PropertiesListener.java b/src/main/java/org/onap/music/main/PropertiesListener.java index 8b00e473..0619cd41 100755 --- a/src/main/java/org/onap/music/main/PropertiesListener.java +++ b/src/main/java/org/onap/music/main/PropertiesListener.java @@ -119,6 +119,9 @@ public class PropertiesListener implements ServletContextListener { case "aaf.endpoint.url": MusicUtil.setAafEndpointUrl(prop.getProperty(key)); break; + case "cassandra.port": + MusicUtil.setCassandraPort(Integer.parseInt(prop.getProperty(key))); + break; default: logger.error(EELFLoggerDelegate.errorLogger, "No case found for " + key); diff --git a/src/main/java/org/onap/music/rest/RestMusicAdminAPI.java b/src/main/java/org/onap/music/rest/RestMusicAdminAPI.java index d1e82337..71570b6c 100755 --- a/src/main/java/org/onap/music/rest/RestMusicAdminAPI.java +++ b/src/main/java/org/onap/music/rest/RestMusicAdminAPI.java @@ -29,10 +29,12 @@ import java.util.Map; import java.util.UUID; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; +import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; @@ -41,20 +43,32 @@ import javax.ws.rs.core.Response.Status; import org.mindrot.jbcrypt.BCrypt; import org.onap.music.datastore.PreparedQueryObject; import org.onap.music.datastore.jsonobjects.JSONObject; +import org.onap.music.datastore.jsonobjects.JsonCallback; import org.onap.music.datastore.jsonobjects.JsonOnboard; import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.eelf.logging.format.AppMessages; import org.onap.music.eelf.logging.format.ErrorSeverity; import org.onap.music.eelf.logging.format.ErrorTypes; +//import org.onap.music.main.CacheAccess; import org.onap.music.main.CachingUtil; import org.onap.music.main.MusicCore; import org.onap.music.main.MusicUtil; import org.onap.music.main.ResultType; +import org.onap.music.response.jsonobjects.JsonResponse; + import com.datastax.driver.core.DataType; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.Base64; + import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; +import org.apache.commons.jcs.JCS; +import org.apache.commons.jcs.access.CacheAccess; @Path("/v2/admin") // @Path("/v{version: [0-9]+}/admin") @@ -63,7 +77,6 @@ import io.swagger.annotations.ApiOperation; public class RestMusicAdminAPI { private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(RestMusicAdminAPI.class); - /* * API to onboard an application with MUSIC. This is the mandatory first step. * @@ -370,14 +383,105 @@ public class RestMusicAdminAPI { return Response.status(Status.OK).entity(resultMap).build(); } + + Client client = Client.create(); @POST @Path("/callbackOps") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) - public String callbackOps(JSONObject inputJsonObj) throws Exception { - - System.out.println("Input JSON: "+inputJsonObj.getData()); - return "Success"; + public String callbackOps(JSONObject inputJsonObj) { + // trigger response {"full_table":"admin.race_winners","keyspace":"admin","name":"Siri","operation":"update","table_name":"race_winner","primary_key":"1"} + try { + logger.info("Got notification: " + inputJsonObj.getData()); + String dataStr = inputJsonObj.getData(); + String[] dataStrArr = dataStr.substring(1, dataStr.length() - 1).split(","); + + for (String key : dataStrArr) { + if (key.contains("full_table")) { + String tableName = key.split(":")[1].substring(1, key.split(":")[1].length() - 1); + PreparedQueryObject pQuery = new PreparedQueryObject(); + pQuery.appendQueryString( + "select endpoint, username, password from admin.callback_api where changes = ? allow filtering"); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), tableName)); + ResultSet rs = MusicCore.get(pQuery); + Row row = rs.all().get(0); + if(row != null) { + String endpoint = row.getString("endpoint"); + String username = row.getString("username"); + String password = row.getString("password"); + logger.info("Notifying the changes to endpoint: "+endpoint); + WebResource webResource = client.resource(endpoint); + String authData = username+":"+password; + byte[] plainCredsBytes = authData.getBytes(); + byte[] base64CredsBytes = Base64.encode(plainCredsBytes); + String base64Creds = new String(base64CredsBytes); + ClientResponse response = webResource.header("Authorization", "Basic " + base64Creds).accept("application/json") + .post(ClientResponse.class, inputJsonObj); + if(response.getStatus() != 200){ + logger.error("Exception while notifying"); + } + } + break; + } + } + } catch(Exception e) { + e.printStackTrace(); + logger.info("Exception..."); + } + return "Success"; + } + + @POST + @Path("/addCallback") + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + public Response addCallback(JsonCallback jsonCallback) throws Exception { + Map resultMap = new HashMap<>(); + ResponseBuilder response = + Response.noContent().header("X-latestVersion", MusicUtil.getVersion()); + String username = jsonCallback.getApplicationUsername(); + String password = jsonCallback.getApplicationPassword(); + String endpoint = jsonCallback.getApplicationNotificationEndpoint(); + String changes = jsonCallback.getNotifyWhenChangeIn(); + String inserts = jsonCallback.getNotifyWhenInsertsIn(); + String deletes = jsonCallback.getNotifyWhenDeletesIn(); + PreparedQueryObject pQuery = new PreparedQueryObject(); + if (username == null || password == null || endpoint == null || changes == null || inserts == null || deletes == null) { + logger.error(EELFLoggerDelegate.errorLogger, "", AppMessages.MISSINGINFO, + ErrorSeverity.CRITICAL, ErrorTypes.DATAERROR); + resultMap.put("Exception", + "Please check the request parameters. Some of the required values are missing."); + return Response.status(Status.BAD_REQUEST).entity(resultMap).build(); + } + String uuid = CachingUtil.generateUUID(); + try { + pQuery.appendQueryString( + "INSERT INTO admin.callback_api (uuid, username, password, endpoint, " + + "changes, inserts, deletes) VALUES (?,?,?,?,?,?,?)"); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.uuid(), uuid)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), username)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), password)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), endpoint)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), changes)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), inserts)); + pQuery.addValue(MusicUtil.convertToActualDataType(DataType.text(), deletes)); + MusicCore.eventualPut(pQuery); + + Map jsonMap = new HashMap<>(); + jsonMap.put("username", username); + jsonMap.put("password", password); + jsonMap.put("endpoint", endpoint); + jsonMap.put("changes", changes); + jsonMap.put("inserts", inserts); + jsonMap.put("deletes", deletes); + + //callBackCache.put(jsonCallback.getApplicationName(), jsonMap); + } catch (InvalidQueryException e) { + logger.error(EELFLoggerDelegate.errorLogger,"Exception callback_api table not configured."+e.getMessage()); + resultMap.put("Exception", "Please make sure admin.callback_api table is configured."); + return Response.status(Status.BAD_REQUEST).entity(resultMap).build(); + } + return response.status(Status.OK).entity(new JsonResponse(ResultType.SUCCESS).setMessage("Callback api successfully registered").toMap()).build(); } } -- cgit 1.2.3-korg