summaryrefslogtreecommitdiffstats
path: root/plugins/restconf-client/provider/src
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/restconf-client/provider/src')
-rw-r--r--plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java175
-rw-r--r--plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java86
-rw-r--r--plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java93
-rw-r--r--plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java104
-rw-r--r--plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java538
-rw-r--r--plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java15
-rw-r--r--plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java22
-rw-r--r--plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java76
8 files changed, 1093 insertions, 16 deletions
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java
new file mode 100644
index 000000000..e0854f43c
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java
@@ -0,0 +1,175 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Objects;
+import java.util.Optional;
+
+public class DefaultRestSBDevice implements RestSBDevice{
+ private static final String REST = "rest";
+ private static final String COLON = ":";
+ private final String ip;
+ private final int port;
+ private final String username;
+ private final String password;
+ private boolean isActive;
+ private String protocol;
+ private String url;
+ private boolean isProxy;
+ private final Optional<String> testUrl;
+ private final Optional<String> manufacturer;
+ private final Optional<String> hwVersion;
+ private final Optional<String> swVersion;
+
+ public DefaultRestSBDevice(String ip, int port, String name, String password,
+ String protocol, String url, boolean isActive) {
+ this(ip, port, name, password, protocol, url, isActive, "", "", "", "");
+ }
+
+ public DefaultRestSBDevice(String ip, int port, String name, String password,
+ String protocol, String url, boolean isActive, String testUrl, String manufacturer,
+ String hwVersion,
+ String swVersion) {
+ Preconditions.checkNotNull(ip, "IP address cannot be null");
+ Preconditions.checkArgument(port > 0, "Port address cannot be negative");
+ Preconditions.checkNotNull(protocol, "protocol address cannot be null");
+ this.ip = ip;
+ this.port = port;
+ this.username = name;
+ this.password = StringUtils.isEmpty(password) ? null : password;
+ this.isActive = isActive;
+ this.protocol = protocol;
+ this.url = StringUtils.isEmpty(url) ? null : url;
+ this.manufacturer = StringUtils.isEmpty(manufacturer) ?
+ Optional.empty() : Optional.ofNullable(manufacturer);
+ this.hwVersion = StringUtils.isEmpty(hwVersion) ?
+ Optional.empty() : Optional.ofNullable(hwVersion);
+ this.swVersion = StringUtils.isEmpty(swVersion) ?
+ Optional.empty() : Optional.ofNullable(swVersion);
+ this.testUrl = StringUtils.isEmpty(testUrl) ?
+ Optional.empty() : Optional.ofNullable(testUrl);
+ if (this.manufacturer.isPresent()
+ && this.hwVersion.isPresent()
+ && this.swVersion.isPresent()) {
+ this.isProxy = true;
+ } else {
+ this.isProxy = false;
+ }
+ }
+
+ @Override
+ public String ip() {
+ return ip;
+ }
+
+ @Override
+ public int port() {
+ return port;
+ }
+
+ @Override
+ public String username() {
+ return username;
+ }
+
+ @Override
+ public String password() {
+ return password;
+ }
+
+ @Override
+ public DeviceId deviceId() {
+ try {
+ return DeviceId.deviceId(new URI(REST, ip + COLON + port, null));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Cannot create deviceID " +
+ REST + COLON + ip +
+ COLON + port, e);
+ }
+ }
+
+ @Override
+ public void setActive(boolean active) {
+ isActive = active;
+ }
+
+ @Override
+ public boolean isActive() {
+ return isActive;
+ }
+
+ @Override
+ public String protocol() {
+ return protocol;
+ }
+
+ @Override
+ public String url() {
+ return url;
+ }
+
+ @Override
+ public boolean isProxy() {
+ return isProxy;
+ }
+
+ @Override
+ public Optional<String> testUrl() {
+ return testUrl;
+ }
+
+ @Override
+ public Optional<String> manufacturer() {
+ return manufacturer;
+ }
+
+ @Override
+ public Optional<String> hwVersion() {
+ return hwVersion;
+ }
+
+ @Override
+ public Optional<String> swVersion() {
+ return swVersion;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .omitNullValues()
+ .add("url", url)
+ .add("testUrl", testUrl)
+ .add("protocol", protocol)
+ .add("username", username)
+ .add("port", port)
+ .add("ip", ip)
+ .add("manufacturer", manufacturer.orElse(null))
+ .add("hwVersion", hwVersion.orElse(null))
+ .add("swVersion", swVersion.orElse(null))
+ .toString();
+
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof RestSBDevice)) {
+ return false;
+ }
+ RestSBDevice device = (RestSBDevice) obj;
+ return this.username.equals(device.username()) && this.ip.equals(device.ip()) &&
+ this.port == device.port();
+
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ip, port);
+ }
+}
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java
new file mode 100644
index 000000000..f0df0242a
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java
@@ -0,0 +1,86 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+
+import java.net.URI;
+import java.util.Objects;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class DeviceId {
+ /**
+ * Represents either no device, or an unspecified device.
+ */
+ public static final DeviceId NONE = deviceId("none:none");
+
+ private static final int DEVICE_ID_MAX_LENGTH = 1024;
+
+ private final URI uri;
+ private final String str;
+
+ // Public construction is prohibited
+ private DeviceId(URI uri) {
+ this.uri = uri;
+ this.str = uri.toString().toLowerCase();
+ }
+
+
+ // Default constructor for serialization
+ protected DeviceId() {
+ this.uri = null;
+ this.str = null;
+ }
+
+ /**
+ * Creates a device id using the supplied URI.
+ *
+ * @param uri device URI
+ * @return DeviceId
+ */
+ public static DeviceId deviceId(URI uri) {
+ return new DeviceId(uri);
+ }
+
+ /**
+ * Creates a device id using the supplied URI string.
+ *
+ * @param string device URI string
+ * @return DeviceId
+ */
+ public static DeviceId deviceId(String string) {
+ checkArgument(string.length() <= DEVICE_ID_MAX_LENGTH,
+ "deviceId exceeds maximum length " + DEVICE_ID_MAX_LENGTH);
+ return deviceId(URI.create(string));
+ }
+
+ /**
+ * Returns the backing URI.
+ *
+ * @return backing URI
+ */
+ public URI uri() {
+ return uri;
+ }
+
+ @Override
+ public int hashCode() {
+ return str.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof DeviceId) {
+ final DeviceId that = (DeviceId) obj;
+ return this.getClass() == that.getClass() &&
+ Objects.equals(this.str, that.str);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return str;
+ }
+
+}
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java
new file mode 100644
index 000000000..aa36a3955
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java
@@ -0,0 +1,93 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+
+import java.util.Map;
+
+/**
+ * Abstraction of a RESTCONF controller. Serves as a one stop shop for obtaining
+ * RESTCONF southbound devices and (un)register listeners.
+ */
+public interface RestConfSBController {
+
+ /**
+ * Returns all the devices known to this controller.
+ *
+ * @return map of devices
+ */
+ Map<DeviceId, RestSBDevice> getDevices();
+
+ /**
+ * Returns a device by node identifier.
+ *
+ * @param deviceInfo node identifier
+ * @return RestSBDevice rest device
+ */
+ RestSBDevice getDevice(DeviceId deviceInfo);
+
+ /**
+ * Returns a device by Ip and Port.
+ *
+ * @param ip device ip
+ * @param port device port
+ * @return RestSBDevice rest device
+ */
+ RestSBDevice getDevice(String ip, int port);
+
+ /**
+ * Adds a device to the device map.
+ *
+ * @param device to be added
+ */
+ void addDevice(RestSBDevice device);
+
+ /**
+ * Removes the device from the devices map.
+ *
+ * @param deviceId to be removed
+ */
+ void removeDevice(DeviceId deviceId);
+
+ /**
+ * This method is to be called by whoever is interested to receive
+ * Notifications from a specific device. It does a REST GET request
+ * with specified parameters to the device, and calls the provided
+ * callBackListener upon receiving notifications to notify the requester
+ * about notifications.
+ *
+ * @param device device to make the request to
+ * @param request url of the request
+ * @param mediaType format to retrieve the content in
+ * @param callBackListener method to call when notifications arrives
+ */
+ void enableNotifications(DeviceId device, String request, String mediaType,
+ RestconfNotificationEventListener callBackListener);
+
+ /**
+ * Registers a listener for notification events that occur to restconf
+ * devices.
+ *
+ * @param deviceId identifier of the device to which the listener is attached
+ * @param listener the listener to notify
+ */
+ void addNotificationListener(DeviceId deviceId,
+ RestconfNotificationEventListener listener);
+
+ /**
+ * Unregisters the listener for the device.
+ *
+ * @param deviceId identifier of the device for which the listener
+ * is to be removed
+ * @param listener listener to be removed
+ */
+ void removeNotificationListener(DeviceId deviceId,
+ RestconfNotificationEventListener listener);
+
+ /**
+ * Returns true if a listener has been installed to listen to RESTCONF
+ * notifications sent from a particular device.
+ *
+ * @param deviceId identifier of the device from which the notifications
+ * are generated
+ * @return true if listener is installed; false otherwise
+ */
+ boolean isNotificationEnabled(DeviceId deviceId);
+}
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java
new file mode 100644
index 000000000..206b02e90
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java
@@ -0,0 +1,104 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+
+import java.util.Optional;
+
+public interface RestSBDevice {
+
+ /**
+ * Returns the ip of this device.
+ *
+ * @return ip
+ */
+ String ip();
+
+ /**
+ * Returns the password of this device.
+ *
+ * @return port
+ */
+ int port();
+
+ /**
+ * Returns the username of this device.
+ *
+ * @return username
+ */
+ String username();
+
+ /**
+ * Returns the password of this device.
+ *
+ * @return password
+ */
+ String password();
+
+ /**
+ * Returns the ONOS deviceID for this device.
+ *
+ * @return DeviceId
+ */
+ DeviceId deviceId();
+
+ /**
+ * Sets or unsets the state of the device.
+ *
+ * @param active boolean
+ */
+ void setActive(boolean active);
+
+ /**
+ * Returns the state of this device.
+ *
+ * @return state
+ */
+ boolean isActive();
+
+ /**
+ * Returns the protocol for the REST request, usually HTTP o HTTPS.
+ *
+ * @return protocol
+ */
+ String protocol();
+
+ /**
+ * Returns the url for the REST requests, to be used instead of IP and PORT.
+ *
+ * @return url
+ */
+ String url();
+
+ /**
+ * Returns the proxy state of this device
+ * (if true, the device is proxying multiple ONOS devices).
+ * @return proxy state
+ */
+ boolean isProxy();
+
+ /**
+ * Returns the url for the REST TEST requests.
+ *
+ * @return testUrl
+ */
+ Optional<String> testUrl();
+
+ /**
+ * The manufacturer of the rest device.
+ *
+ * @return the name of the manufacturer
+ */
+ Optional<String> manufacturer();
+
+ /**
+ * The hardware version of the rest device.
+ *
+ * @return the hardware version
+ */
+ Optional<String> hwVersion();
+
+ /**
+ * The software version of rest device.
+ *
+ * @return the software version.
+ */
+ Optional<String> swVersion();
+}
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java
index d6b93f744..bda6854c5 100644
--- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java
@@ -20,7 +20,12 @@
package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.glassfish.jersey.media.sse.EventInput;
import org.glassfish.jersey.media.sse.EventSource;
+import org.glassfish.jersey.media.sse.InboundEvent;
import org.glassfish.jersey.media.sse.SseFeature;
import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
import org.onap.ccsdk.sli.core.sli.SvcLogicException;
@@ -29,6 +34,7 @@ import org.onap.ccsdk.sli.plugins.restapicall.Parameters;
import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
@@ -36,40 +42,63 @@ import javax.net.ssl.X509TrustManager;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
+import java.util.HashSet;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.*;
+import static org.onap.ccsdk.sli.plugins.restapicall.JsonParser.convertToProperties;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Representation of a plugin to subscribe for notification and then
* to handle the received notifications.
*/
-public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
+public class RestconfDiscoveryNode implements RestConfSBController, SvcLogicDiscoveryPlugin {
private static final Logger log = getLogger(RestconfDiscoveryNode.class);
- private ExecutorService executor = Executors.newCachedThreadPool();
- private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
- private RestconfApiCallNode restconfApiCallNode;
-
- private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
- private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
-
+ private static final String ROOT_RESOURCE = "/restconf";
private static final String SUBSCRIBER_ID = "subscriberId";
private static final String RESPONSE_CODE = "response-code";
private static final String RESPONSE_PREFIX = "responsePrefix";
private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
"ications:establish-subscription.output.identifier";
+ private static final String OUTPUT_IDENTIFIER_NO_PREFIX = "output.identifier";
private static final String RESPONSE_CODE_200 = "200";
private static final String SSE_URL = "sseConnectURL";
+ private static final String REST_API_URL = "restapiUrl";
+ private static final String RESOURCE_PATH_PREFIX = "/data/";
+ private static final String NOTIFICATION_PATH_PREFIX = "/streams/";
+ private static final String DEVICE_IP = "deviceIp";
+ private static final String DEVICE_PORT = "devicePort";
+ private static final String DOUBLESLASH = "//";
+ private static final String COLON = ":";
+
+ private RestconfApiCallNode restconfApiCallNode;
+ private RestapiCallNode restapiCallNode = new RestapiCallNode();
+ private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
+ private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
+ private Map<DeviceId, Set<RestconfNotificationEventListener>>
+ restconfNotificationListenerMap = new ConcurrentHashMap<>();
+ private Map<DeviceId, GetChunksRunnable>
+ runnableTable = new ConcurrentHashMap<>();
+ private Map<DeviceId, String> subscribedDevicesTable = new ConcurrentHashMap<>();
+ private Map<DeviceId, BlockingQueue<String>> eventQMap = new ConcurrentHashMap<>();
+ private Map<DeviceId, InternalRestconfEventProcessorRunnable>
+ processorRunnableTable = new ConcurrentHashMap<>();
+ private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
+ private final Map<DeviceId, RestSBDevice> deviceMap = new ConcurrentHashMap<>();
+ private final Map<DeviceId, Client> clientMap = new ConcurrentHashMap<>();
+ private ExecutorService executor = Executors.newCachedThreadPool();
/**
* Creates an instance of RestconfDiscoveryNode and starts processing of
@@ -78,15 +107,92 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
* @param r restconf api call node
*/
public RestconfDiscoveryNode(RestconfApiCallNode r) {
+ log.info("inside RestconfDiscoveryNode Constructor");
this.restconfApiCallNode = r;
- ExecutorService e = Executors.newFixedThreadPool(20);
- EventProcessor p = new EventProcessor(this);
- for (int i = 0; i < 20; ++i) {
- e.execute(p);
+ this.activate();
+// ExecutorService e = Executors.newFixedThreadPool(20);
+// EventProcessor p = new EventProcessor(this);
+// for (int i = 0; i < 20; ++i) {
+// e.execute(p);
+// }
+ }
+
+ public void activate() {
+ log.info("RESTCONF SBI Started");
+ }
+
+ public void deactivate() {
+ log.info("RESTCONF SBI Stopped");
+ executor.shutdown();
+ this.getClientMap().clear();
+ this.getDeviceMap().clear();
+ }
+
+ public Map<DeviceId, RestSBDevice> getDeviceMap() {
+ return deviceMap;
+ }
+
+ public Map<DeviceId, Client> getClientMap() {
+ return clientMap;
+ }
+
+ @Override
+ public Map<DeviceId, RestSBDevice> getDevices() {
+ log.trace("RESTCONF SBI::getDevices");
+ return ImmutableMap.copyOf(deviceMap);
+ }
+
+ @Override
+ public RestSBDevice getDevice(DeviceId deviceInfo) {
+ log.trace("RESTCONF SBI::getDevice with deviceId");
+ return deviceMap.get(deviceInfo);
+ }
+
+ @Override
+ public RestSBDevice getDevice(String ip, int port) {
+ log.trace("RESTCONF SBI::getDevice with ip and port");
+ try {
+ if (!deviceMap.isEmpty()) {
+ return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get();
+ }
+ } catch (NoSuchElementException noSuchElementException) {
+ log.error("getDevice::device {}:{} does not exist in deviceMap", ip, port);
+ }
+ return null;
+ }
+
+ @Override
+ public void addDevice(RestSBDevice device) {
+ log.trace("RESTCONF SBI::addDevice");
+ if (!deviceMap.containsKey(device.deviceId())) {
+ if (device.username() != null) {
+ String username = device.username();
+ String password = device.password() == null ? "" : device.password();
+ // authenticate(client, username, password);
+ }
+ BlockingQueue<String> newBlockingQueue = new LinkedBlockingQueue<>();
+ eventQMap.put(device.deviceId(), newBlockingQueue);
+ InternalRestconfEventProcessorRunnable eventProcessorRunnable =
+ new InternalRestconfEventProcessorRunnable(device.deviceId());
+ processorRunnableTable.put(device.deviceId(), eventProcessorRunnable);
+ log.trace("addDevice::restconf event processor runnable is created and is going for execute");
+ executor.execute(eventProcessorRunnable);
+ log.trace("addDevice::restconf event processor runnable was sent for execute");
+ deviceMap.put(device.deviceId(), device);
+ } else {
+ log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId());
}
}
@Override
+ public void removeDevice(DeviceId deviceId) {
+ log.trace("RESTCONF SBI::removeDevice");
+ eventQMap.remove(deviceId);
+ clientMap.remove(deviceId);
+ deviceMap.remove(deviceId);
+ }
+
+ @Override
public void establishSubscription(Map<String, String> paramMap,
SvcLogicContext ctx) throws SvcLogicException {
String subscriberId = paramMap.get(SUBSCRIBER_ID);
@@ -107,6 +213,62 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
}
@Override
+ public void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx)
+ throws SvcLogicException {
+ String subscriberId = paramMap.get(SUBSCRIBER_ID);
+ if (subscriberId == null) {
+ throw new SvcLogicException("Subscriber Id is null");
+ }
+
+ String subscribeUrlString = paramMap.get(REST_API_URL);
+ URL subscribeUrl = null;
+ RestSBDevice dev = null;
+ try {
+ subscribeUrl = new URL(subscribeUrlString);
+ dev = getDevice(subscribeUrl.getHost(), subscribeUrl.getPort());
+ } catch (MalformedURLException e) {
+ log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
+ return;
+ }
+
+ if (dev == null) {
+ log.warn("establishSubscriptionOnly::device does not exist in the map. Trying to create one now.");
+ //FIXME: TODO: create a new RestSBDevice and add it to the map, as well as a client and clientMap
+ dev = new DefaultRestSBDevice(subscribeUrl.getHost(),
+ subscribeUrl.getPort(), "onos", "rocks", "http",
+ subscribeUrl.getHost() + ":" + subscribeUrl.getPort(), true);
+ this.addDevice(dev);
+ }
+
+ if (!subscribedDevicesTable.containsKey(dev.deviceId())) {
+ log.info("establishSubscriptionOnly::The device {} has not been subscribed yet. " +
+ "Trying to subscribe it now...");
+ restapiCallNode.sendRequest(paramMap, ctx);
+ if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
+ // TODO: save subscription id and subscriber in MYSQL
+ String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx);
+ log.info("establishSubscriptionOnly::Subscription is done successfully and " +
+ "the output.identifier is: {}", id);
+ log.info("establishSubscriptionOnly::The subscriptionID returned by the server " +
+ "does not exist in the map. Adding it now...");
+ subscribedDevicesTable.put(dev.deviceId(), id);
+
+ SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
+ paramMap.get("rpc"),
+ paramMap.get("version"),
+ paramMap.get("mode"));
+ SubscriptionInfo info = new SubscriptionInfo();
+ info.callBackDG(callbackDG);
+ info.subscriptionId(id);
+ info.subscriberId(subscriberId);
+ subscriptionInfoMap.put(id, info);
+
+ }
+ }
+
+ }
+
+ @Override
public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
// TODO: to be implemented
}
@@ -252,6 +414,10 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
}
+ String getOutputIdentifierNoPrefix(String prefix, SvcLogicContext ctx) {
+ return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER_NO_PREFIX);
+ }
+
/**
* Returns subscription id from event.
*
@@ -278,6 +444,34 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
return null;
}
+ private String getUrlString(DeviceId deviceId, String request) {
+ RestSBDevice restSBDevice = deviceMap.get(deviceId);
+ if (restSBDevice == null) {
+ log.warn("getUrlString::restSbDevice cannot be NULL!");
+ return "";
+ }
+ if (restSBDevice.url() != null) {
+ return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.url() + request;
+ } else {
+ return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.ip().toString()
+ + COLON + restSBDevice.port() + request;
+ }
+ }
+
+ private String getSubscriptionIdFromDeviceId(DeviceId deviceId) {
+ if (subscribedDevicesTable.containsKey(deviceId)) {
+ return subscribedDevicesTable.get(deviceId);
+ }
+ return null;
+ }
+
+ private BlockingQueue<String> getEventQ(DeviceId deviceId) {
+ if (eventQMap.containsKey(deviceId)) {
+ return eventQMap.get(deviceId);
+ }
+ return null;
+ }
+
/**
* Returns restconfApiCallNode.
*
@@ -311,4 +505,316 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
void eventQueue(LinkedBlockingQueue<String> eventQueue) {
this.eventQueue = eventQueue;
}
+
+ /**
+ * Establishes a persistent SSE connection between the client and the server.
+ *
+ * @param paramMap input paramter map
+ * @param ctx service logic context
+ */
+ @Override
+ public void establishPersistentSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException {
+
+ //TODO: FIXME: remove the instantiation of info; not useful
+ String subscriberId = paramMap.get(SUBSCRIBER_ID);
+ SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
+ paramMap.get("rpc"),
+ paramMap.get("version"),
+ paramMap.get("mode"));
+ SubscriptionInfo info = new SubscriptionInfo();
+ info.callBackDG(callbackDG);
+ info.subscriberId(subscriberId);
+
+ String sseUrlString = paramMap.get(SSE_URL);
+ URL sseUrl = null;
+ RestSBDevice dev = null;
+ try {
+ sseUrl = new URL(sseUrlString);
+ dev = getDevice(sseUrl.getHost(), sseUrl.getPort());
+ } catch (MalformedURLException e) {
+ log.error("establishPersistentSseConnection::MalformedURLException happened. e: {}", e);
+ return;
+ }
+
+ if (dev == null) {
+ log.warn("establishPersistentSseConnection::device does not exist in the map. Trying to add one now.");
+ dev = new DefaultRestSBDevice(sseUrl.getHost(),
+ sseUrl.getPort(), "onos", "rocks", "http",
+ sseUrl.getHost() + ":" + sseUrl.getPort(), true);
+ this.addDevice(dev);
+ }
+
+ if (isNotificationEnabled(dev.deviceId())) {
+ log.warn("establishPersistentSseConnection::notifications already enabled on device: {}",
+ dev.deviceId());
+ return;
+ }
+
+ if (getSubscriptionIdFromDeviceId(dev.deviceId()) == null) {
+ log.warn("This device {} has not yet been subscribed to receive notifications.",
+ dev.deviceId());
+ return;
+ }
+
+ RestconfNotificationEventListenerImpl myListener =
+ new RestconfNotificationEventListenerImpl(info);
+ enableNotifications(dev.deviceId(), "yang-push-json", "json", myListener);
+ }
+
+ @Override
+ public void enableNotifications(DeviceId device, String request,
+ String mediaType,
+ RestconfNotificationEventListener listener) {
+ if (isNotificationEnabled(device)) {
+ log.warn("enableNotifications::already enabled on device: {}", device);
+ return;
+ }
+
+ request = discoverRootResource(device) + NOTIFICATION_PATH_PREFIX
+ + request;
+
+ addNotificationListener(device, listener);
+
+ GetChunksRunnable runnable = new GetChunksRunnable(request, mediaType,
+ device);
+ runnableTable.put(device, runnable);
+ executor.execute(runnable);
+ }
+
+ public void stopNotifications(DeviceId device) {
+ try {
+ runnableTable.get(device).terminate();
+ processorRunnableTable.get(device).terminate();
+ } catch (Exception ex) {
+ log.error("stopNotifications::Exception happened when terminating, ex: {}", ex);
+ }
+ log.info("stopNotifications::Runnable is now terminated");
+ runnableTable.remove(device);
+ processorRunnableTable.remove(device);
+ restconfNotificationListenerMap.remove(device);
+ log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString());
+ }
+
+ @Override
+ public void deleteSubscriptionAndSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) {
+ String deleteSubscribeUrlString = paramMap.get(REST_API_URL);
+ URL deleteSubscribeUrl = null;
+ RestSBDevice dev = null;
+ try {
+ deleteSubscribeUrl = new URL(deleteSubscribeUrlString);
+ dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort());
+ } catch (MalformedURLException e) {
+ log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
+ return;
+ }
+
+ String deviceIp = deleteSubscribeUrl.getHost();
+ String devicePort = String.valueOf(deleteSubscribeUrl.getPort());
+ log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}",
+ deviceIp, devicePort);
+ if (dev == null) {
+ log.error("deleteSubscriptionAndSseConnection::device does not exist in the map");
+ return;
+ }
+ String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId());
+
+ if (subscriptionId != null) {
+ log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId);
+ log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request");
+ try {
+ restapiCallNode.sendRequest(paramMap, ctx);
+ if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
+ log.info("deleteSubscriptionAndSseConnection::Successfully unsubscribed");
+ stopNotifications(dev.deviceId());
+ subscribedDevicesTable.remove(dev.deviceId());
+
+ String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
+ if (id != null) {
+ subscriptionInfoMap.remove(id);
+ }
+
+ } else {
+ log.info("deleteSubscriptionAndSseConnection::Unsubscription was NOT successfull");
+ }
+ } catch (SvcLogicException e) {
+ log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e);
+ }
+ } else {
+ log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed");
+ }
+ }
+
+ /**
+ * Notifies providers about incoming RESTCONF notification events.
+ */public class GetChunksRunnable implements Runnable {
+ private String request;
+ private String mediaType;
+ private DeviceId deviceId;
+
+ private volatile boolean running = true;
+
+ public void terminate() {
+ log.info("GetChunksRunnable.terminate()::threadID: {}",
+ Thread.currentThread().getId());
+ running = false;
+ }
+
+ /**
+ * @param request request
+ * @param mediaType media type
+ * @param deviceId device identifier
+ */
+ public GetChunksRunnable(String request, String mediaType,
+ DeviceId deviceId) {
+ this.request = request;
+ this.mediaType = mediaType;
+ this.deviceId = deviceId;
+ }
+
+ @Override
+ public void run() {
+ log.trace("GetChunksRunnable.run()::threadID is: {} ...., running is: {}",
+ Thread.currentThread().getId(), running);
+ try {
+ Client client = ClientBuilder.newBuilder()
+ .register(SseFeature.class).build();
+ WebTarget target = client.target(getUrlString(deviceId, request));
+ log.trace("GetChunksRunnable.run()::target URI is {}", target.getUri().toString());
+ Response response = target.request().get();
+ EventInput eventInput = response.readEntity(EventInput.class);
+ log.trace("GetChunksRunnable.run()::after eventInput");
+ String rcvdData = "";
+ while (!eventInput.isClosed() && running) {
+ log.trace("GetChunksRunnable.run()::inside while ...");
+ final InboundEvent inboundEvent = eventInput.read();
+ log.trace("GetChunksRunnable.run()::after eventInput.read() ...");
+ if (inboundEvent == null) {
+ // connection has been closed
+ log.info("GetChunksRunnable.run()::connection has been closed ...");
+ break;
+ }
+ if (running) {
+ rcvdData = inboundEvent.readData(String.class);
+ BlockingQueue<String> eventQ = getEventQ(deviceId);
+ if (eventQ != null) {
+ eventQ.add(rcvdData);
+ eventQMap.put(deviceId, eventQ);
+ log.trace("GetChunksRunnable.run()::eventQ got filled.");
+ } else {
+ log.error("GetChunksRunnable.run()::eventQ has not been initialized for this device {}",
+ deviceId);
+ }
+ } else {
+ log.info("GetChunksRunnable.run()::running has changed to false while eventInput.read() " +
+ "was blocked to receive new notifications");
+ log.info("GetChunksRunnable.run()::the client is no longer interested to " +
+ "receive notifications.");
+ break;
+ }
+ }
+ if (!running) {
+ log.trace("GetChunksRunnable.run()::running is false! " +
+ "closing eventInput, threadID: {}", Thread.currentThread().getId());
+ eventInput.close();
+ response.close();
+ client.close();
+ log.info("GetChunksRunnable.run()::eventInput is closed in run()");
+ }
+ } catch (Exception ex) {
+ log.info("GetChunksRunnable.run()::We got some exception: {}, threadID: {} ", ex,
+ Thread.currentThread().getId());
+ }
+ log.trace("GetChunksRunnable.run()::after Runnable Try Catch. threadID: {} ",
+ Thread.currentThread().getId());
+ }
+ }
+
+ public class InternalRestconfEventProcessorRunnable implements Runnable {
+
+ private volatile boolean running = true;
+ private DeviceId deviceId;
+
+ public InternalRestconfEventProcessorRunnable(DeviceId deviceId) {
+ this.deviceId = deviceId;
+ }
+
+ public void terminate() {
+ log.info("InternalRestconfEventProcessorRunnable.terminate()::threadID: {}",
+ Thread.currentThread().getId());
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ log.trace("InternalRestconfEventProcessorRunnable::restconf event processor runnable inside run()");
+ while (running) {
+ try {
+ if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) {
+ log.trace("InternalRestconfEventProcessorRunnable::waiting for take()");
+ if (running) {
+ String eventJsonString = eventQMap.get(deviceId).take();
+ log.trace("InternalRestconfEventProcessorRunnable::after take()");
+ log.info("InternalRestconfEventProcessorRunnable::eventJsonString is {}", eventJsonString);
+ Map<String, String> param = convertToProperties(eventJsonString);
+ String idString = param.get("push-change-update.subscription-id");
+ SubscriptionInfo info = subscriptionInfoMap().get(idString);
+ if (info != null) {
+ SvcLogicContext ctx = setContext(param);
+ SvcLogicGraphInfo callbackDG = info.callBackDG();
+ callbackDG.executeGraph(ctx);
+ }
+ } else {
+ log.info("InternalRestconfEventProcessorRunnable.run()::running has changed to false " +
+ "while eventQ was blocked to process new notifications");
+ log.info("InternalRestconfEventProcessorRunnable.run()::" +
+ "the client is no longer interested to receive notifications.");
+ break;
+ }
+ }
+ } catch (InterruptedException | SvcLogicException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ private SvcLogicContext setContext(Map<String, String> param) {
+ SvcLogicContext ctx = new SvcLogicContext();
+ for (Map.Entry<String, String> entry : param.entrySet()) {
+ ctx.setAttribute(entry.getKey(), entry.getValue());
+ }
+ return ctx;
+ }
+ }
+
+ public String discoverRootResource(DeviceId device) {
+ return ROOT_RESOURCE;
+ }
+
+ @Override
+ public void addNotificationListener(DeviceId deviceId,
+ RestconfNotificationEventListener listener) {
+ Set<RestconfNotificationEventListener> listeners =
+ restconfNotificationListenerMap.get(deviceId);
+ if (listeners == null) {
+ listeners = new HashSet<>();
+ }
+
+ listeners.add(listener);
+
+ this.restconfNotificationListenerMap.put(deviceId, listeners);
+ }
+
+ @Override
+ public void removeNotificationListener(DeviceId deviceId,
+ RestconfNotificationEventListener listener) {
+ Set<RestconfNotificationEventListener> listeners =
+ restconfNotificationListenerMap.get(deviceId);
+ if (listeners != null) {
+ listeners.remove(listener);
+ }
+ }
+
+ public boolean isNotificationEnabled(DeviceId deviceId) {
+ return runnableTable.containsKey(deviceId);
+ }
+
}
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java
new file mode 100644
index 000000000..292e17e46
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java
@@ -0,0 +1,15 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+
+/**
+ * Notifies providers about incoming RESTCONF notification events.
+ */
+public interface RestconfNotificationEventListener<T> {
+
+ /**
+ * Handles the notification event.
+ *
+ * @param deviceId restconf device identifier
+ * @param event event payload
+ */
+ void handleNotificationEvent(DeviceId deviceId, T event);
+}
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java
new file mode 100644
index 000000000..fa2fa02d2
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java
@@ -0,0 +1,22 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class RestconfNotificationEventListenerImpl implements
+ RestconfNotificationEventListener<String> {
+
+ private final Logger log = getLogger(getClass());
+ SubscriptionInfo info;
+
+ public RestconfNotificationEventListenerImpl(SubscriptionInfo info) {
+ this.info = info;
+ }
+
+ @Override
+ public void handleNotificationEvent(DeviceId deviceId, String eventJsonString) {
+ log.info("New notification: {} for device: {}",
+ eventJsonString, deviceId.toString());
+ }
+}
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java
index dfe8cd5b7..972fb2b55 100644
--- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java
@@ -62,6 +62,68 @@ public interface SvcLogicDiscoveryPlugin extends SvcLogicJavaPlugin {
void establishSubscription(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException;
/**
+ * Allows directed graphs to subscribe to a restconf server to receive notifications from that server.
+ * @param paramMap HashMap<String,String> of parameters passed by the DG to this function
+ * <table border="1">
+ * <thead><th>parameter</th><th>Mandatory/Optional</th><th>description</th><th>example values</th></thead>
+ * <tbody>
+ * <tr><td>templateDirName</td><td>Optional</td><td>full path to YANG directory that can be used to build a request</td><td>/sdncopt/bvc/resconfapi/test</td></tr>
+ * <tr><td>establishSubscriptionURL</td><td>Mandatory</td><td>url to establish connection with server</td><td>https://127.0.0.1:8181/restconf/operations/ietf-subscribed-notifications:establish-subscription</td></tr>
+ * <tr><td>sseConnectURL</td><td>Mandatory</td><td>url to setup SSE connection with server</td><td>https://127.0.0.1:8181/restconf/streams/yang-push-json</td></tr>
+ * <tr><td>callbackDG</td><td>Mandatory</td><td>callback DG to process the received notification</td><td>Resource-Discovery:handleSOTNTopology</td></tr>
+ * <tr><td>filterURL</td><td>Optional</td><td>url which needs to be subscribed, if null subscribe to all</td><td>http://example.com/sample-data/1.0</td></tr>
+ * <tr><td>subscriptionType</td><td>Optional</td><td>type of subscription, periodic or onDataChange</td><td>onDataChange</td></tr>
+ * <tr><td>updateFrequency</td><td>Optional</td><td>update frequency in milli seconds when subscription type is periodic</td><td>1000</td></tr>
+ * <tr><td>restapiUser</td><td>Optional</td><td>user name to use for http basic authentication</td><td>sdnc_ws</td></tr>
+ * <tr><td>restapiPassword</td><td>Optional</td><td>unencrypted password to use for http basic authentication</td><td>plain_password</td></tr>
+ * <tr><td>contentType</td><td>Optional</td><td>http content type to set in the http header</td><td>usually application/json or application/xml</td></tr>
+ * <tr><td>format</td><td>Optional</td><td>should match request body format</td><td>json or xml</td></tr>
+ * <tr><td>responsePrefix</td><td>Optional</td><td>location the notification response will be written to in context memory</td><td>tmp.restconfdiscovery.result</td></tr>
+ * <tr><td>skipSending</td><td>Optional</td><td></td><td>true or false</td></tr>
+ * <tr><td>convertResponse </td><td>Optional</td><td>whether the response should be converted</td><td>true or false</td></tr>
+ * <tr><td>customHttpHeaders</td><td>Optional</td><td>a list additional http headers to be passed in, follow the format in the example</td><td>X-CSI-MessageId=messageId,headerFieldName=headerFieldValue</td></tr>
+ * <tr><td>dumpHeaders</td><td>Optional</td><td>when true writes http header content to context memory</td><td>true or false</td></tr>
+ * </tbody>
+ * </table>
+ * @param ctx Reference to context memory
+ * @throws SvcLogicException
+ * @since 11.0.2
+ * @see String#split(String, int)
+ */
+ void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException;
+
+ /**
+ * Allows directed graphs to establish a discovery subscription for a given subscriber.
+ * @param paramMap HashMap<String,String> of parameters passed by the DG to this function
+ * <table border="1">
+ * <thead><th>parameter</th><th>Mandatory/Optional</th><th>description</th><th>example values</th></thead>
+ * <tbody>
+ * <tr><td>templateDirName</td><td>Optional</td><td>full path to YANG directory that can be used to build a request</td><td>/sdncopt/bvc/resconfapi/test</td></tr>
+ * <tr><td>establishSubscriptionURL</td><td>Mandatory</td><td>url to establish connection with server</td><td>https://127.0.0.1:8181/restconf/operations/ietf-subscribed-notifications:establish-subscription</td></tr>
+ * <tr><td>sseConnectURL</td><td>Mandatory</td><td>url to setup SSE connection with server</td><td>https://127.0.0.1:8181/restconf/streams/yang-push-json</td></tr>
+ * <tr><td>callbackDG</td><td>Mandatory</td><td>callback DG to process the received notification</td><td>Resource-Discovery:handleSOTNTopology</td></tr>
+ * <tr><td>filterURL</td><td>Optional</td><td>url which needs to be subscribed, if null subscribe to all</td><td>http://example.com/sample-data/1.0</td></tr>
+ * <tr><td>subscriptionType</td><td>Optional</td><td>type of subscription, periodic or onDataChange</td><td>onDataChange</td></tr>
+ * <tr><td>updateFrequency</td><td>Optional</td><td>update frequency in milli seconds when subscription type is periodic</td><td>1000</td></tr>
+ * <tr><td>restapiUser</td><td>Optional</td><td>user name to use for http basic authentication</td><td>sdnc_ws</td></tr>
+ * <tr><td>restapiPassword</td><td>Optional</td><td>unencrypted password to use for http basic authentication</td><td>plain_password</td></tr>
+ * <tr><td>contentType</td><td>Optional</td><td>http content type to set in the http header</td><td>usually application/json or application/xml</td></tr>
+ * <tr><td>format</td><td>Optional</td><td>should match request body format</td><td>json or xml</td></tr>
+ * <tr><td>responsePrefix</td><td>Optional</td><td>location the notification response will be written to in context memory</td><td>tmp.restconfdiscovery.result</td></tr>
+ * <tr><td>skipSending</td><td>Optional</td><td></td><td>true or false</td></tr>
+ * <tr><td>convertResponse </td><td>Optional</td><td>whether the response should be converted</td><td>true or false</td></tr>
+ * <tr><td>customHttpHeaders</td><td>Optional</td><td>a list additional http headers to be passed in, follow the format in the example</td><td>X-CSI-MessageId=messageId,headerFieldName=headerFieldValue</td></tr>
+ * <tr><td>dumpHeaders</td><td>Optional</td><td>when true writes http header content to context memory</td><td>true or false</td></tr>
+ * </tbody>
+ * </table>
+ * @param ctx Reference to context memory
+ * @throws SvcLogicException
+ * @since 11.0.2
+ * @see String#split(String, int)
+ */
+ void establishPersistentSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException;
+
+ /**
* Allows directed graphs to modify a discovery subscription for a given subscriber.
* @param paramMap HashMap<String,String> of parameters passed by the DG to this function
* <table border="1">
@@ -107,4 +169,18 @@ public interface SvcLogicDiscoveryPlugin extends SvcLogicJavaPlugin {
*/
void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx);
+ /**
+ * Allows directed graphs to unsubscribe from a restconf server and to remove the persistent sse connection.
+ * @param paramMap HashMap<String,String> of parameters passed by the DG to this function
+ * <table border="1">
+ * <thead><th>parameter</th><th>Mandatory/Optional</th><th>description</th><th>example values</th></thead>
+ * <tbody>
+ * <tr><td>subscriberId</td><td>Mandatory</td><td>subscription subscriber's identifier</td><td>topologyId/1111</td></tr>
+ * </tbody>
+ * </table>
+ * @param ctx Reference to context memory
+ * @throws SvcLogicException
+ */
+ void deleteSubscriptionAndSseConnection(Map<String, String> paramMap, SvcLogicContext ctx);
+
}