From 04766f01fb17e464d722d21d328264d139a2fcd0 Mon Sep 17 00:00:00 2001 From: Hesam Rahimi Date: Tue, 1 Mar 2022 22:34:38 -0500 Subject: Implementation of a RESTCONF client that can establish SSE connections to a RESTCONF server. This is for receiving notification events to support closed-loop for IBNs. Issue-ID: CCSDK-3595 Signed-off-by: Hesam Rahimi Change-Id: I2a3be01b03d889b41d4608011436d8b587a621e5 --- .../restconfdiscovery/DefaultRestSBDevice.java | 175 +++++++ .../sli/plugins/restconfdiscovery/DeviceId.java | 86 ++++ .../restconfdiscovery/RestConfSBController.java | 93 ++++ .../plugins/restconfdiscovery/RestSBDevice.java | 104 ++++ .../restconfdiscovery/RestconfDiscoveryNode.java | 538 ++++++++++++++++++++- .../RestconfNotificationEventListener.java | 15 + .../RestconfNotificationEventListenerImpl.java | 22 + .../restconfdiscovery/SvcLogicDiscoveryPlugin.java | 76 +++ 8 files changed, 1093 insertions(+), 16 deletions(-) create mode 100644 plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java create mode 100644 plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java create mode 100644 plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java create mode 100644 plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java create mode 100644 plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java create mode 100644 plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java (limited to 'plugins/restconf-client/provider/src/main') diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java new file mode 100644 index 000000000..e0854f43c --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java @@ -0,0 +1,175 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Objects; +import java.util.Optional; + +public class DefaultRestSBDevice implements RestSBDevice{ + private static final String REST = "rest"; + private static final String COLON = ":"; + private final String ip; + private final int port; + private final String username; + private final String password; + private boolean isActive; + private String protocol; + private String url; + private boolean isProxy; + private final Optional testUrl; + private final Optional manufacturer; + private final Optional hwVersion; + private final Optional swVersion; + + public DefaultRestSBDevice(String ip, int port, String name, String password, + String protocol, String url, boolean isActive) { + this(ip, port, name, password, protocol, url, isActive, "", "", "", ""); + } + + public DefaultRestSBDevice(String ip, int port, String name, String password, + String protocol, String url, boolean isActive, String testUrl, String manufacturer, + String hwVersion, + String swVersion) { + Preconditions.checkNotNull(ip, "IP address cannot be null"); + Preconditions.checkArgument(port > 0, "Port address cannot be negative"); + Preconditions.checkNotNull(protocol, "protocol address cannot be null"); + this.ip = ip; + this.port = port; + this.username = name; + this.password = StringUtils.isEmpty(password) ? null : password; + this.isActive = isActive; + this.protocol = protocol; + this.url = StringUtils.isEmpty(url) ? null : url; + this.manufacturer = StringUtils.isEmpty(manufacturer) ? + Optional.empty() : Optional.ofNullable(manufacturer); + this.hwVersion = StringUtils.isEmpty(hwVersion) ? + Optional.empty() : Optional.ofNullable(hwVersion); + this.swVersion = StringUtils.isEmpty(swVersion) ? + Optional.empty() : Optional.ofNullable(swVersion); + this.testUrl = StringUtils.isEmpty(testUrl) ? + Optional.empty() : Optional.ofNullable(testUrl); + if (this.manufacturer.isPresent() + && this.hwVersion.isPresent() + && this.swVersion.isPresent()) { + this.isProxy = true; + } else { + this.isProxy = false; + } + } + + @Override + public String ip() { + return ip; + } + + @Override + public int port() { + return port; + } + + @Override + public String username() { + return username; + } + + @Override + public String password() { + return password; + } + + @Override + public DeviceId deviceId() { + try { + return DeviceId.deviceId(new URI(REST, ip + COLON + port, null)); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Cannot create deviceID " + + REST + COLON + ip + + COLON + port, e); + } + } + + @Override + public void setActive(boolean active) { + isActive = active; + } + + @Override + public boolean isActive() { + return isActive; + } + + @Override + public String protocol() { + return protocol; + } + + @Override + public String url() { + return url; + } + + @Override + public boolean isProxy() { + return isProxy; + } + + @Override + public Optional testUrl() { + return testUrl; + } + + @Override + public Optional manufacturer() { + return manufacturer; + } + + @Override + public Optional hwVersion() { + return hwVersion; + } + + @Override + public Optional swVersion() { + return swVersion; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("url", url) + .add("testUrl", testUrl) + .add("protocol", protocol) + .add("username", username) + .add("port", port) + .add("ip", ip) + .add("manufacturer", manufacturer.orElse(null)) + .add("hwVersion", hwVersion.orElse(null)) + .add("swVersion", swVersion.orElse(null)) + .toString(); + + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof RestSBDevice)) { + return false; + } + RestSBDevice device = (RestSBDevice) obj; + return this.username.equals(device.username()) && this.ip.equals(device.ip()) && + this.port == device.port(); + + } + + @Override + public int hashCode() { + return Objects.hash(ip, port); + } +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java new file mode 100644 index 000000000..f0df0242a --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java @@ -0,0 +1,86 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import java.net.URI; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; + +public class DeviceId { + /** + * Represents either no device, or an unspecified device. + */ + public static final DeviceId NONE = deviceId("none:none"); + + private static final int DEVICE_ID_MAX_LENGTH = 1024; + + private final URI uri; + private final String str; + + // Public construction is prohibited + private DeviceId(URI uri) { + this.uri = uri; + this.str = uri.toString().toLowerCase(); + } + + + // Default constructor for serialization + protected DeviceId() { + this.uri = null; + this.str = null; + } + + /** + * Creates a device id using the supplied URI. + * + * @param uri device URI + * @return DeviceId + */ + public static DeviceId deviceId(URI uri) { + return new DeviceId(uri); + } + + /** + * Creates a device id using the supplied URI string. + * + * @param string device URI string + * @return DeviceId + */ + public static DeviceId deviceId(String string) { + checkArgument(string.length() <= DEVICE_ID_MAX_LENGTH, + "deviceId exceeds maximum length " + DEVICE_ID_MAX_LENGTH); + return deviceId(URI.create(string)); + } + + /** + * Returns the backing URI. + * + * @return backing URI + */ + public URI uri() { + return uri; + } + + @Override + public int hashCode() { + return str.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof DeviceId) { + final DeviceId that = (DeviceId) obj; + return this.getClass() == that.getClass() && + Objects.equals(this.str, that.str); + } + return false; + } + + @Override + public String toString() { + return str; + } + +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java new file mode 100644 index 000000000..aa36a3955 --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java @@ -0,0 +1,93 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import java.util.Map; + +/** + * Abstraction of a RESTCONF controller. Serves as a one stop shop for obtaining + * RESTCONF southbound devices and (un)register listeners. + */ +public interface RestConfSBController { + + /** + * Returns all the devices known to this controller. + * + * @return map of devices + */ + Map getDevices(); + + /** + * Returns a device by node identifier. + * + * @param deviceInfo node identifier + * @return RestSBDevice rest device + */ + RestSBDevice getDevice(DeviceId deviceInfo); + + /** + * Returns a device by Ip and Port. + * + * @param ip device ip + * @param port device port + * @return RestSBDevice rest device + */ + RestSBDevice getDevice(String ip, int port); + + /** + * Adds a device to the device map. + * + * @param device to be added + */ + void addDevice(RestSBDevice device); + + /** + * Removes the device from the devices map. + * + * @param deviceId to be removed + */ + void removeDevice(DeviceId deviceId); + + /** + * This method is to be called by whoever is interested to receive + * Notifications from a specific device. It does a REST GET request + * with specified parameters to the device, and calls the provided + * callBackListener upon receiving notifications to notify the requester + * about notifications. + * + * @param device device to make the request to + * @param request url of the request + * @param mediaType format to retrieve the content in + * @param callBackListener method to call when notifications arrives + */ + void enableNotifications(DeviceId device, String request, String mediaType, + RestconfNotificationEventListener callBackListener); + + /** + * Registers a listener for notification events that occur to restconf + * devices. + * + * @param deviceId identifier of the device to which the listener is attached + * @param listener the listener to notify + */ + void addNotificationListener(DeviceId deviceId, + RestconfNotificationEventListener listener); + + /** + * Unregisters the listener for the device. + * + * @param deviceId identifier of the device for which the listener + * is to be removed + * @param listener listener to be removed + */ + void removeNotificationListener(DeviceId deviceId, + RestconfNotificationEventListener listener); + + /** + * Returns true if a listener has been installed to listen to RESTCONF + * notifications sent from a particular device. + * + * @param deviceId identifier of the device from which the notifications + * are generated + * @return true if listener is installed; false otherwise + */ + boolean isNotificationEnabled(DeviceId deviceId); +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java new file mode 100644 index 000000000..206b02e90 --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java @@ -0,0 +1,104 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import java.util.Optional; + +public interface RestSBDevice { + + /** + * Returns the ip of this device. + * + * @return ip + */ + String ip(); + + /** + * Returns the password of this device. + * + * @return port + */ + int port(); + + /** + * Returns the username of this device. + * + * @return username + */ + String username(); + + /** + * Returns the password of this device. + * + * @return password + */ + String password(); + + /** + * Returns the ONOS deviceID for this device. + * + * @return DeviceId + */ + DeviceId deviceId(); + + /** + * Sets or unsets the state of the device. + * + * @param active boolean + */ + void setActive(boolean active); + + /** + * Returns the state of this device. + * + * @return state + */ + boolean isActive(); + + /** + * Returns the protocol for the REST request, usually HTTP o HTTPS. + * + * @return protocol + */ + String protocol(); + + /** + * Returns the url for the REST requests, to be used instead of IP and PORT. + * + * @return url + */ + String url(); + + /** + * Returns the proxy state of this device + * (if true, the device is proxying multiple ONOS devices). + * @return proxy state + */ + boolean isProxy(); + + /** + * Returns the url for the REST TEST requests. + * + * @return testUrl + */ + Optional testUrl(); + + /** + * The manufacturer of the rest device. + * + * @return the name of the manufacturer + */ + Optional manufacturer(); + + /** + * The hardware version of the rest device. + * + * @return the hardware version + */ + Optional hwVersion(); + + /** + * The software version of rest device. + * + * @return the software version. + */ + Optional swVersion(); +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java index d6b93f744..bda6854c5 100644 --- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java @@ -20,7 +20,12 @@ package org.onap.ccsdk.sli.plugins.restconfdiscovery; +import com.google.common.collect.ImmutableMap; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.glassfish.jersey.media.sse.EventInput; import org.glassfish.jersey.media.sse.EventSource; +import org.glassfish.jersey.media.sse.InboundEvent; import org.glassfish.jersey.media.sse.SseFeature; import org.onap.ccsdk.sli.core.sli.SvcLogicContext; import org.onap.ccsdk.sli.core.sli.SvcLogicException; @@ -29,6 +34,7 @@ import org.onap.ccsdk.sli.plugins.restapicall.Parameters; import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode; import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -36,40 +42,63 @@ import javax.net.ssl.X509TrustManager; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import java.net.MalformedURLException; +import java.net.URL; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; +import java.util.HashSet; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.*; +import static org.onap.ccsdk.sli.plugins.restapicall.JsonParser.convertToProperties; import static org.slf4j.LoggerFactory.getLogger; /** * Representation of a plugin to subscribe for notification and then * to handle the received notifications. */ -public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { +public class RestconfDiscoveryNode implements RestConfSBController, SvcLogicDiscoveryPlugin { private static final Logger log = getLogger(RestconfDiscoveryNode.class); - private ExecutorService executor = Executors.newCachedThreadPool(); - private Map runnableInfo = new ConcurrentHashMap<>(); - private RestconfApiCallNode restconfApiCallNode; - - private volatile Map subscriptionInfoMap = new ConcurrentHashMap<>(); - private volatile LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); - + private static final String ROOT_RESOURCE = "/restconf"; private static final String SUBSCRIBER_ID = "subscriberId"; private static final String RESPONSE_CODE = "response-code"; private static final String RESPONSE_PREFIX = "responsePrefix"; private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" + "ications:establish-subscription.output.identifier"; + private static final String OUTPUT_IDENTIFIER_NO_PREFIX = "output.identifier"; private static final String RESPONSE_CODE_200 = "200"; private static final String SSE_URL = "sseConnectURL"; + private static final String REST_API_URL = "restapiUrl"; + private static final String RESOURCE_PATH_PREFIX = "/data/"; + private static final String NOTIFICATION_PATH_PREFIX = "/streams/"; + private static final String DEVICE_IP = "deviceIp"; + private static final String DEVICE_PORT = "devicePort"; + private static final String DOUBLESLASH = "//"; + private static final String COLON = ":"; + + private RestconfApiCallNode restconfApiCallNode; + private RestapiCallNode restapiCallNode = new RestapiCallNode(); + private volatile Map subscriptionInfoMap = new ConcurrentHashMap<>(); + private volatile LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); + private Map> + restconfNotificationListenerMap = new ConcurrentHashMap<>(); + private Map + runnableTable = new ConcurrentHashMap<>(); + private Map subscribedDevicesTable = new ConcurrentHashMap<>(); + private Map> eventQMap = new ConcurrentHashMap<>(); + private Map + processorRunnableTable = new ConcurrentHashMap<>(); + private Map runnableInfo = new ConcurrentHashMap<>(); + private final Map deviceMap = new ConcurrentHashMap<>(); + private final Map clientMap = new ConcurrentHashMap<>(); + private ExecutorService executor = Executors.newCachedThreadPool(); /** * Creates an instance of RestconfDiscoveryNode and starts processing of @@ -78,14 +107,91 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { * @param r restconf api call node */ public RestconfDiscoveryNode(RestconfApiCallNode r) { + log.info("inside RestconfDiscoveryNode Constructor"); this.restconfApiCallNode = r; - ExecutorService e = Executors.newFixedThreadPool(20); - EventProcessor p = new EventProcessor(this); - for (int i = 0; i < 20; ++i) { - e.execute(p); + this.activate(); +// ExecutorService e = Executors.newFixedThreadPool(20); +// EventProcessor p = new EventProcessor(this); +// for (int i = 0; i < 20; ++i) { +// e.execute(p); +// } + } + + public void activate() { + log.info("RESTCONF SBI Started"); + } + + public void deactivate() { + log.info("RESTCONF SBI Stopped"); + executor.shutdown(); + this.getClientMap().clear(); + this.getDeviceMap().clear(); + } + + public Map getDeviceMap() { + return deviceMap; + } + + public Map getClientMap() { + return clientMap; + } + + @Override + public Map getDevices() { + log.trace("RESTCONF SBI::getDevices"); + return ImmutableMap.copyOf(deviceMap); + } + + @Override + public RestSBDevice getDevice(DeviceId deviceInfo) { + log.trace("RESTCONF SBI::getDevice with deviceId"); + return deviceMap.get(deviceInfo); + } + + @Override + public RestSBDevice getDevice(String ip, int port) { + log.trace("RESTCONF SBI::getDevice with ip and port"); + try { + if (!deviceMap.isEmpty()) { + return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get(); + } + } catch (NoSuchElementException noSuchElementException) { + log.error("getDevice::device {}:{} does not exist in deviceMap", ip, port); + } + return null; + } + + @Override + public void addDevice(RestSBDevice device) { + log.trace("RESTCONF SBI::addDevice"); + if (!deviceMap.containsKey(device.deviceId())) { + if (device.username() != null) { + String username = device.username(); + String password = device.password() == null ? "" : device.password(); + // authenticate(client, username, password); + } + BlockingQueue newBlockingQueue = new LinkedBlockingQueue<>(); + eventQMap.put(device.deviceId(), newBlockingQueue); + InternalRestconfEventProcessorRunnable eventProcessorRunnable = + new InternalRestconfEventProcessorRunnable(device.deviceId()); + processorRunnableTable.put(device.deviceId(), eventProcessorRunnable); + log.trace("addDevice::restconf event processor runnable is created and is going for execute"); + executor.execute(eventProcessorRunnable); + log.trace("addDevice::restconf event processor runnable was sent for execute"); + deviceMap.put(device.deviceId(), device); + } else { + log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId()); } } + @Override + public void removeDevice(DeviceId deviceId) { + log.trace("RESTCONF SBI::removeDevice"); + eventQMap.remove(deviceId); + clientMap.remove(deviceId); + deviceMap.remove(deviceId); + } + @Override public void establishSubscription(Map paramMap, SvcLogicContext ctx) throws SvcLogicException { @@ -106,6 +212,62 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { } } + @Override + public void establishSubscriptionOnly(Map paramMap, SvcLogicContext ctx) + throws SvcLogicException { + String subscriberId = paramMap.get(SUBSCRIBER_ID); + if (subscriberId == null) { + throw new SvcLogicException("Subscriber Id is null"); + } + + String subscribeUrlString = paramMap.get(REST_API_URL); + URL subscribeUrl = null; + RestSBDevice dev = null; + try { + subscribeUrl = new URL(subscribeUrlString); + dev = getDevice(subscribeUrl.getHost(), subscribeUrl.getPort()); + } catch (MalformedURLException e) { + log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e); + return; + } + + if (dev == null) { + log.warn("establishSubscriptionOnly::device does not exist in the map. Trying to create one now."); + //FIXME: TODO: create a new RestSBDevice and add it to the map, as well as a client and clientMap + dev = new DefaultRestSBDevice(subscribeUrl.getHost(), + subscribeUrl.getPort(), "onos", "rocks", "http", + subscribeUrl.getHost() + ":" + subscribeUrl.getPort(), true); + this.addDevice(dev); + } + + if (!subscribedDevicesTable.containsKey(dev.deviceId())) { + log.info("establishSubscriptionOnly::The device {} has not been subscribed yet. " + + "Trying to subscribe it now..."); + restapiCallNode.sendRequest(paramMap, ctx); + if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) { + // TODO: save subscription id and subscriber in MYSQL + String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx); + log.info("establishSubscriptionOnly::Subscription is done successfully and " + + "the output.identifier is: {}", id); + log.info("establishSubscriptionOnly::The subscriptionID returned by the server " + + "does not exist in the map. Adding it now..."); + subscribedDevicesTable.put(dev.deviceId(), id); + + SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"), + paramMap.get("rpc"), + paramMap.get("version"), + paramMap.get("mode")); + SubscriptionInfo info = new SubscriptionInfo(); + info.callBackDG(callbackDG); + info.subscriptionId(id); + info.subscriberId(subscriberId); + subscriptionInfoMap.put(id, info); + + } + } + + } + @Override public void modifySubscription(Map paramMap, SvcLogicContext ctx) { // TODO: to be implemented @@ -252,6 +414,10 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE); } + String getOutputIdentifierNoPrefix(String prefix, SvcLogicContext ctx) { + return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER_NO_PREFIX); + } + /** * Returns subscription id from event. * @@ -278,6 +444,34 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { return null; } + private String getUrlString(DeviceId deviceId, String request) { + RestSBDevice restSBDevice = deviceMap.get(deviceId); + if (restSBDevice == null) { + log.warn("getUrlString::restSbDevice cannot be NULL!"); + return ""; + } + if (restSBDevice.url() != null) { + return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.url() + request; + } else { + return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.ip().toString() + + COLON + restSBDevice.port() + request; + } + } + + private String getSubscriptionIdFromDeviceId(DeviceId deviceId) { + if (subscribedDevicesTable.containsKey(deviceId)) { + return subscribedDevicesTable.get(deviceId); + } + return null; + } + + private BlockingQueue getEventQ(DeviceId deviceId) { + if (eventQMap.containsKey(deviceId)) { + return eventQMap.get(deviceId); + } + return null; + } + /** * Returns restconfApiCallNode. * @@ -311,4 +505,316 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { void eventQueue(LinkedBlockingQueue eventQueue) { this.eventQueue = eventQueue; } + + /** + * Establishes a persistent SSE connection between the client and the server. + * + * @param paramMap input paramter map + * @param ctx service logic context + */ + @Override + public void establishPersistentSseConnection(Map paramMap, SvcLogicContext ctx) throws SvcLogicException { + + //TODO: FIXME: remove the instantiation of info; not useful + String subscriberId = paramMap.get(SUBSCRIBER_ID); + SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"), + paramMap.get("rpc"), + paramMap.get("version"), + paramMap.get("mode")); + SubscriptionInfo info = new SubscriptionInfo(); + info.callBackDG(callbackDG); + info.subscriberId(subscriberId); + + String sseUrlString = paramMap.get(SSE_URL); + URL sseUrl = null; + RestSBDevice dev = null; + try { + sseUrl = new URL(sseUrlString); + dev = getDevice(sseUrl.getHost(), sseUrl.getPort()); + } catch (MalformedURLException e) { + log.error("establishPersistentSseConnection::MalformedURLException happened. e: {}", e); + return; + } + + if (dev == null) { + log.warn("establishPersistentSseConnection::device does not exist in the map. Trying to add one now."); + dev = new DefaultRestSBDevice(sseUrl.getHost(), + sseUrl.getPort(), "onos", "rocks", "http", + sseUrl.getHost() + ":" + sseUrl.getPort(), true); + this.addDevice(dev); + } + + if (isNotificationEnabled(dev.deviceId())) { + log.warn("establishPersistentSseConnection::notifications already enabled on device: {}", + dev.deviceId()); + return; + } + + if (getSubscriptionIdFromDeviceId(dev.deviceId()) == null) { + log.warn("This device {} has not yet been subscribed to receive notifications.", + dev.deviceId()); + return; + } + + RestconfNotificationEventListenerImpl myListener = + new RestconfNotificationEventListenerImpl(info); + enableNotifications(dev.deviceId(), "yang-push-json", "json", myListener); + } + + @Override + public void enableNotifications(DeviceId device, String request, + String mediaType, + RestconfNotificationEventListener listener) { + if (isNotificationEnabled(device)) { + log.warn("enableNotifications::already enabled on device: {}", device); + return; + } + + request = discoverRootResource(device) + NOTIFICATION_PATH_PREFIX + + request; + + addNotificationListener(device, listener); + + GetChunksRunnable runnable = new GetChunksRunnable(request, mediaType, + device); + runnableTable.put(device, runnable); + executor.execute(runnable); + } + + public void stopNotifications(DeviceId device) { + try { + runnableTable.get(device).terminate(); + processorRunnableTable.get(device).terminate(); + } catch (Exception ex) { + log.error("stopNotifications::Exception happened when terminating, ex: {}", ex); + } + log.info("stopNotifications::Runnable is now terminated"); + runnableTable.remove(device); + processorRunnableTable.remove(device); + restconfNotificationListenerMap.remove(device); + log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString()); + } + + @Override + public void deleteSubscriptionAndSseConnection(Map paramMap, SvcLogicContext ctx) { + String deleteSubscribeUrlString = paramMap.get(REST_API_URL); + URL deleteSubscribeUrl = null; + RestSBDevice dev = null; + try { + deleteSubscribeUrl = new URL(deleteSubscribeUrlString); + dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort()); + } catch (MalformedURLException e) { + log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e); + return; + } + + String deviceIp = deleteSubscribeUrl.getHost(); + String devicePort = String.valueOf(deleteSubscribeUrl.getPort()); + log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}", + deviceIp, devicePort); + if (dev == null) { + log.error("deleteSubscriptionAndSseConnection::device does not exist in the map"); + return; + } + String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId()); + + if (subscriptionId != null) { + log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId); + log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request"); + try { + restapiCallNode.sendRequest(paramMap, ctx); + if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) { + log.info("deleteSubscriptionAndSseConnection::Successfully unsubscribed"); + stopNotifications(dev.deviceId()); + subscribedDevicesTable.remove(dev.deviceId()); + + String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID)); + if (id != null) { + subscriptionInfoMap.remove(id); + } + + } else { + log.info("deleteSubscriptionAndSseConnection::Unsubscription was NOT successfull"); + } + } catch (SvcLogicException e) { + log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e); + } + } else { + log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed"); + } + } + + /** + * Notifies providers about incoming RESTCONF notification events. + */public class GetChunksRunnable implements Runnable { + private String request; + private String mediaType; + private DeviceId deviceId; + + private volatile boolean running = true; + + public void terminate() { + log.info("GetChunksRunnable.terminate()::threadID: {}", + Thread.currentThread().getId()); + running = false; + } + + /** + * @param request request + * @param mediaType media type + * @param deviceId device identifier + */ + public GetChunksRunnable(String request, String mediaType, + DeviceId deviceId) { + this.request = request; + this.mediaType = mediaType; + this.deviceId = deviceId; + } + + @Override + public void run() { + log.trace("GetChunksRunnable.run()::threadID is: {} ...., running is: {}", + Thread.currentThread().getId(), running); + try { + Client client = ClientBuilder.newBuilder() + .register(SseFeature.class).build(); + WebTarget target = client.target(getUrlString(deviceId, request)); + log.trace("GetChunksRunnable.run()::target URI is {}", target.getUri().toString()); + Response response = target.request().get(); + EventInput eventInput = response.readEntity(EventInput.class); + log.trace("GetChunksRunnable.run()::after eventInput"); + String rcvdData = ""; + while (!eventInput.isClosed() && running) { + log.trace("GetChunksRunnable.run()::inside while ..."); + final InboundEvent inboundEvent = eventInput.read(); + log.trace("GetChunksRunnable.run()::after eventInput.read() ..."); + if (inboundEvent == null) { + // connection has been closed + log.info("GetChunksRunnable.run()::connection has been closed ..."); + break; + } + if (running) { + rcvdData = inboundEvent.readData(String.class); + BlockingQueue eventQ = getEventQ(deviceId); + if (eventQ != null) { + eventQ.add(rcvdData); + eventQMap.put(deviceId, eventQ); + log.trace("GetChunksRunnable.run()::eventQ got filled."); + } else { + log.error("GetChunksRunnable.run()::eventQ has not been initialized for this device {}", + deviceId); + } + } else { + log.info("GetChunksRunnable.run()::running has changed to false while eventInput.read() " + + "was blocked to receive new notifications"); + log.info("GetChunksRunnable.run()::the client is no longer interested to " + + "receive notifications."); + break; + } + } + if (!running) { + log.trace("GetChunksRunnable.run()::running is false! " + + "closing eventInput, threadID: {}", Thread.currentThread().getId()); + eventInput.close(); + response.close(); + client.close(); + log.info("GetChunksRunnable.run()::eventInput is closed in run()"); + } + } catch (Exception ex) { + log.info("GetChunksRunnable.run()::We got some exception: {}, threadID: {} ", ex, + Thread.currentThread().getId()); + } + log.trace("GetChunksRunnable.run()::after Runnable Try Catch. threadID: {} ", + Thread.currentThread().getId()); + } + } + + public class InternalRestconfEventProcessorRunnable implements Runnable { + + private volatile boolean running = true; + private DeviceId deviceId; + + public InternalRestconfEventProcessorRunnable(DeviceId deviceId) { + this.deviceId = deviceId; + } + + public void terminate() { + log.info("InternalRestconfEventProcessorRunnable.terminate()::threadID: {}", + Thread.currentThread().getId()); + running = false; + } + + @Override + public void run() { + log.trace("InternalRestconfEventProcessorRunnable::restconf event processor runnable inside run()"); + while (running) { + try { + if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) { + log.trace("InternalRestconfEventProcessorRunnable::waiting for take()"); + if (running) { + String eventJsonString = eventQMap.get(deviceId).take(); + log.trace("InternalRestconfEventProcessorRunnable::after take()"); + log.info("InternalRestconfEventProcessorRunnable::eventJsonString is {}", eventJsonString); + Map param = convertToProperties(eventJsonString); + String idString = param.get("push-change-update.subscription-id"); + SubscriptionInfo info = subscriptionInfoMap().get(idString); + if (info != null) { + SvcLogicContext ctx = setContext(param); + SvcLogicGraphInfo callbackDG = info.callBackDG(); + callbackDG.executeGraph(ctx); + } + } else { + log.info("InternalRestconfEventProcessorRunnable.run()::running has changed to false " + + "while eventQ was blocked to process new notifications"); + log.info("InternalRestconfEventProcessorRunnable.run()::" + + "the client is no longer interested to receive notifications."); + break; + } + } + } catch (InterruptedException | SvcLogicException e) { + e.printStackTrace(); + } + } + } + private SvcLogicContext setContext(Map param) { + SvcLogicContext ctx = new SvcLogicContext(); + for (Map.Entry entry : param.entrySet()) { + ctx.setAttribute(entry.getKey(), entry.getValue()); + } + return ctx; + } + } + + public String discoverRootResource(DeviceId device) { + return ROOT_RESOURCE; + } + + @Override + public void addNotificationListener(DeviceId deviceId, + RestconfNotificationEventListener listener) { + Set listeners = + restconfNotificationListenerMap.get(deviceId); + if (listeners == null) { + listeners = new HashSet<>(); + } + + listeners.add(listener); + + this.restconfNotificationListenerMap.put(deviceId, listeners); + } + + @Override + public void removeNotificationListener(DeviceId deviceId, + RestconfNotificationEventListener listener) { + Set listeners = + restconfNotificationListenerMap.get(deviceId); + if (listeners != null) { + listeners.remove(listener); + } + } + + public boolean isNotificationEnabled(DeviceId deviceId) { + return runnableTable.containsKey(deviceId); + } + } diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java new file mode 100644 index 000000000..292e17e46 --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java @@ -0,0 +1,15 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +/** + * Notifies providers about incoming RESTCONF notification events. + */ +public interface RestconfNotificationEventListener { + + /** + * Handles the notification event. + * + * @param deviceId restconf device identifier + * @param event event payload + */ + void handleNotificationEvent(DeviceId deviceId, T event); +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java new file mode 100644 index 000000000..fa2fa02d2 --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java @@ -0,0 +1,22 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; + +public class RestconfNotificationEventListenerImpl implements + RestconfNotificationEventListener { + + private final Logger log = getLogger(getClass()); + SubscriptionInfo info; + + public RestconfNotificationEventListenerImpl(SubscriptionInfo info) { + this.info = info; + } + + @Override + public void handleNotificationEvent(DeviceId deviceId, String eventJsonString) { + log.info("New notification: {} for device: {}", + eventJsonString, deviceId.toString()); + } +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java index dfe8cd5b7..972fb2b55 100644 --- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java @@ -61,6 +61,68 @@ public interface SvcLogicDiscoveryPlugin extends SvcLogicJavaPlugin { */ void establishSubscription(Map paramMap, SvcLogicContext ctx) throws SvcLogicException; + /** + * Allows directed graphs to subscribe to a restconf server to receive notifications from that server. + * @param paramMap HashMap of parameters passed by the DG to this function + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
parameterMandatory/Optionaldescriptionexample values
templateDirNameOptionalfull path to YANG directory that can be used to build a request/sdncopt/bvc/resconfapi/test
establishSubscriptionURLMandatoryurl to establish connection with serverhttps://127.0.0.1:8181/restconf/operations/ietf-subscribed-notifications:establish-subscription
sseConnectURLMandatoryurl to setup SSE connection with serverhttps://127.0.0.1:8181/restconf/streams/yang-push-json
callbackDGMandatorycallback DG to process the received notificationResource-Discovery:handleSOTNTopology
filterURLOptionalurl which needs to be subscribed, if null subscribe to allhttp://example.com/sample-data/1.0
subscriptionTypeOptionaltype of subscription, periodic or onDataChangeonDataChange
updateFrequencyOptionalupdate frequency in milli seconds when subscription type is periodic1000
restapiUserOptionaluser name to use for http basic authenticationsdnc_ws
restapiPasswordOptionalunencrypted password to use for http basic authenticationplain_password
contentTypeOptionalhttp content type to set in the http headerusually application/json or application/xml
formatOptionalshould match request body formatjson or xml
responsePrefixOptionallocation the notification response will be written to in context memorytmp.restconfdiscovery.result
skipSendingOptionaltrue or false
convertResponse Optionalwhether the response should be convertedtrue or false
customHttpHeadersOptionala list additional http headers to be passed in, follow the format in the exampleX-CSI-MessageId=messageId,headerFieldName=headerFieldValue
dumpHeadersOptionalwhen true writes http header content to context memorytrue or false
+ * @param ctx Reference to context memory + * @throws SvcLogicException + * @since 11.0.2 + * @see String#split(String, int) + */ + void establishSubscriptionOnly(Map paramMap, SvcLogicContext ctx) throws SvcLogicException; + + /** + * Allows directed graphs to establish a discovery subscription for a given subscriber. + * @param paramMap HashMap of parameters passed by the DG to this function + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
parameterMandatory/Optionaldescriptionexample values
templateDirNameOptionalfull path to YANG directory that can be used to build a request/sdncopt/bvc/resconfapi/test
establishSubscriptionURLMandatoryurl to establish connection with serverhttps://127.0.0.1:8181/restconf/operations/ietf-subscribed-notifications:establish-subscription
sseConnectURLMandatoryurl to setup SSE connection with serverhttps://127.0.0.1:8181/restconf/streams/yang-push-json
callbackDGMandatorycallback DG to process the received notificationResource-Discovery:handleSOTNTopology
filterURLOptionalurl which needs to be subscribed, if null subscribe to allhttp://example.com/sample-data/1.0
subscriptionTypeOptionaltype of subscription, periodic or onDataChangeonDataChange
updateFrequencyOptionalupdate frequency in milli seconds when subscription type is periodic1000
restapiUserOptionaluser name to use for http basic authenticationsdnc_ws
restapiPasswordOptionalunencrypted password to use for http basic authenticationplain_password
contentTypeOptionalhttp content type to set in the http headerusually application/json or application/xml
formatOptionalshould match request body formatjson or xml
responsePrefixOptionallocation the notification response will be written to in context memorytmp.restconfdiscovery.result
skipSendingOptionaltrue or false
convertResponse Optionalwhether the response should be convertedtrue or false
customHttpHeadersOptionala list additional http headers to be passed in, follow the format in the exampleX-CSI-MessageId=messageId,headerFieldName=headerFieldValue
dumpHeadersOptionalwhen true writes http header content to context memorytrue or false
+ * @param ctx Reference to context memory + * @throws SvcLogicException + * @since 11.0.2 + * @see String#split(String, int) + */ + void establishPersistentSseConnection(Map paramMap, SvcLogicContext ctx) throws SvcLogicException; + /** * Allows directed graphs to modify a discovery subscription for a given subscriber. * @param paramMap HashMap of parameters passed by the DG to this function @@ -107,4 +169,18 @@ public interface SvcLogicDiscoveryPlugin extends SvcLogicJavaPlugin { */ void deleteSubscription(Map paramMap, SvcLogicContext ctx); + /** + * Allows directed graphs to unsubscribe from a restconf server and to remove the persistent sse connection. + * @param paramMap HashMap of parameters passed by the DG to this function + * + * + * + * + * + *
parameterMandatory/Optionaldescriptionexample values
subscriberIdMandatorysubscription subscriber's identifiertopologyId/1111
+ * @param ctx Reference to context memory + * @throws SvcLogicException + */ + void deleteSubscriptionAndSseConnection(Map paramMap, SvcLogicContext ctx); + } -- cgit 1.2.3-korg