diff options
author | Hesam Rahimi <hesam.rahimi@huawei.com> | 2022-03-01 22:34:38 -0500 |
---|---|---|
committer | Hesam Rahimi <hesam.rahimi@huawei.com> | 2022-03-02 11:33:07 -0500 |
commit | 04766f01fb17e464d722d21d328264d139a2fcd0 (patch) | |
tree | b5377d404701095e93ff323a6f89934259398f7f /plugins/restconf-client/provider/src/main | |
parent | db3f905491c6a8f844212a4227281e758d73d333 (diff) |
Implementation of a RESTCONF client that can establish SSE connections
to a RESTCONF server. This is for receiving notification events to
support closed-loop for IBNs.
Issue-ID: CCSDK-3595
Signed-off-by: Hesam Rahimi <hesam.rahimi@huawei.com>
Change-Id: I2a3be01b03d889b41d4608011436d8b587a621e5
Diffstat (limited to 'plugins/restconf-client/provider/src/main')
8 files changed, 1093 insertions, 16 deletions
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java new file mode 100644 index 000000000..e0854f43c --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DefaultRestSBDevice.java @@ -0,0 +1,175 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Objects; +import java.util.Optional; + +public class DefaultRestSBDevice implements RestSBDevice{ + private static final String REST = "rest"; + private static final String COLON = ":"; + private final String ip; + private final int port; + private final String username; + private final String password; + private boolean isActive; + private String protocol; + private String url; + private boolean isProxy; + private final Optional<String> testUrl; + private final Optional<String> manufacturer; + private final Optional<String> hwVersion; + private final Optional<String> swVersion; + + public DefaultRestSBDevice(String ip, int port, String name, String password, + String protocol, String url, boolean isActive) { + this(ip, port, name, password, protocol, url, isActive, "", "", "", ""); + } + + public DefaultRestSBDevice(String ip, int port, String name, String password, + String protocol, String url, boolean isActive, String testUrl, String manufacturer, + String hwVersion, + String swVersion) { + Preconditions.checkNotNull(ip, "IP address cannot be null"); + Preconditions.checkArgument(port > 0, "Port address cannot be negative"); + Preconditions.checkNotNull(protocol, "protocol address cannot be null"); + this.ip = ip; + this.port = port; + this.username = name; + this.password = StringUtils.isEmpty(password) ? null : password; + this.isActive = isActive; + this.protocol = protocol; + this.url = StringUtils.isEmpty(url) ? null : url; + this.manufacturer = StringUtils.isEmpty(manufacturer) ? + Optional.empty() : Optional.ofNullable(manufacturer); + this.hwVersion = StringUtils.isEmpty(hwVersion) ? + Optional.empty() : Optional.ofNullable(hwVersion); + this.swVersion = StringUtils.isEmpty(swVersion) ? + Optional.empty() : Optional.ofNullable(swVersion); + this.testUrl = StringUtils.isEmpty(testUrl) ? + Optional.empty() : Optional.ofNullable(testUrl); + if (this.manufacturer.isPresent() + && this.hwVersion.isPresent() + && this.swVersion.isPresent()) { + this.isProxy = true; + } else { + this.isProxy = false; + } + } + + @Override + public String ip() { + return ip; + } + + @Override + public int port() { + return port; + } + + @Override + public String username() { + return username; + } + + @Override + public String password() { + return password; + } + + @Override + public DeviceId deviceId() { + try { + return DeviceId.deviceId(new URI(REST, ip + COLON + port, null)); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Cannot create deviceID " + + REST + COLON + ip + + COLON + port, e); + } + } + + @Override + public void setActive(boolean active) { + isActive = active; + } + + @Override + public boolean isActive() { + return isActive; + } + + @Override + public String protocol() { + return protocol; + } + + @Override + public String url() { + return url; + } + + @Override + public boolean isProxy() { + return isProxy; + } + + @Override + public Optional<String> testUrl() { + return testUrl; + } + + @Override + public Optional<String> manufacturer() { + return manufacturer; + } + + @Override + public Optional<String> hwVersion() { + return hwVersion; + } + + @Override + public Optional<String> swVersion() { + return swVersion; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("url", url) + .add("testUrl", testUrl) + .add("protocol", protocol) + .add("username", username) + .add("port", port) + .add("ip", ip) + .add("manufacturer", manufacturer.orElse(null)) + .add("hwVersion", hwVersion.orElse(null)) + .add("swVersion", swVersion.orElse(null)) + .toString(); + + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof RestSBDevice)) { + return false; + } + RestSBDevice device = (RestSBDevice) obj; + return this.username.equals(device.username()) && this.ip.equals(device.ip()) && + this.port == device.port(); + + } + + @Override + public int hashCode() { + return Objects.hash(ip, port); + } +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java new file mode 100644 index 000000000..f0df0242a --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/DeviceId.java @@ -0,0 +1,86 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import java.net.URI; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; + +public class DeviceId { + /** + * Represents either no device, or an unspecified device. + */ + public static final DeviceId NONE = deviceId("none:none"); + + private static final int DEVICE_ID_MAX_LENGTH = 1024; + + private final URI uri; + private final String str; + + // Public construction is prohibited + private DeviceId(URI uri) { + this.uri = uri; + this.str = uri.toString().toLowerCase(); + } + + + // Default constructor for serialization + protected DeviceId() { + this.uri = null; + this.str = null; + } + + /** + * Creates a device id using the supplied URI. + * + * @param uri device URI + * @return DeviceId + */ + public static DeviceId deviceId(URI uri) { + return new DeviceId(uri); + } + + /** + * Creates a device id using the supplied URI string. + * + * @param string device URI string + * @return DeviceId + */ + public static DeviceId deviceId(String string) { + checkArgument(string.length() <= DEVICE_ID_MAX_LENGTH, + "deviceId exceeds maximum length " + DEVICE_ID_MAX_LENGTH); + return deviceId(URI.create(string)); + } + + /** + * Returns the backing URI. + * + * @return backing URI + */ + public URI uri() { + return uri; + } + + @Override + public int hashCode() { + return str.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof DeviceId) { + final DeviceId that = (DeviceId) obj; + return this.getClass() == that.getClass() && + Objects.equals(this.str, that.str); + } + return false; + } + + @Override + public String toString() { + return str; + } + +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java new file mode 100644 index 000000000..aa36a3955 --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestConfSBController.java @@ -0,0 +1,93 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import java.util.Map; + +/** + * Abstraction of a RESTCONF controller. Serves as a one stop shop for obtaining + * RESTCONF southbound devices and (un)register listeners. + */ +public interface RestConfSBController { + + /** + * Returns all the devices known to this controller. + * + * @return map of devices + */ + Map<DeviceId, RestSBDevice> getDevices(); + + /** + * Returns a device by node identifier. + * + * @param deviceInfo node identifier + * @return RestSBDevice rest device + */ + RestSBDevice getDevice(DeviceId deviceInfo); + + /** + * Returns a device by Ip and Port. + * + * @param ip device ip + * @param port device port + * @return RestSBDevice rest device + */ + RestSBDevice getDevice(String ip, int port); + + /** + * Adds a device to the device map. + * + * @param device to be added + */ + void addDevice(RestSBDevice device); + + /** + * Removes the device from the devices map. + * + * @param deviceId to be removed + */ + void removeDevice(DeviceId deviceId); + + /** + * This method is to be called by whoever is interested to receive + * Notifications from a specific device. It does a REST GET request + * with specified parameters to the device, and calls the provided + * callBackListener upon receiving notifications to notify the requester + * about notifications. + * + * @param device device to make the request to + * @param request url of the request + * @param mediaType format to retrieve the content in + * @param callBackListener method to call when notifications arrives + */ + void enableNotifications(DeviceId device, String request, String mediaType, + RestconfNotificationEventListener callBackListener); + + /** + * Registers a listener for notification events that occur to restconf + * devices. + * + * @param deviceId identifier of the device to which the listener is attached + * @param listener the listener to notify + */ + void addNotificationListener(DeviceId deviceId, + RestconfNotificationEventListener listener); + + /** + * Unregisters the listener for the device. + * + * @param deviceId identifier of the device for which the listener + * is to be removed + * @param listener listener to be removed + */ + void removeNotificationListener(DeviceId deviceId, + RestconfNotificationEventListener listener); + + /** + * Returns true if a listener has been installed to listen to RESTCONF + * notifications sent from a particular device. + * + * @param deviceId identifier of the device from which the notifications + * are generated + * @return true if listener is installed; false otherwise + */ + boolean isNotificationEnabled(DeviceId deviceId); +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java new file mode 100644 index 000000000..206b02e90 --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestSBDevice.java @@ -0,0 +1,104 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import java.util.Optional; + +public interface RestSBDevice { + + /** + * Returns the ip of this device. + * + * @return ip + */ + String ip(); + + /** + * Returns the password of this device. + * + * @return port + */ + int port(); + + /** + * Returns the username of this device. + * + * @return username + */ + String username(); + + /** + * Returns the password of this device. + * + * @return password + */ + String password(); + + /** + * Returns the ONOS deviceID for this device. + * + * @return DeviceId + */ + DeviceId deviceId(); + + /** + * Sets or unsets the state of the device. + * + * @param active boolean + */ + void setActive(boolean active); + + /** + * Returns the state of this device. + * + * @return state + */ + boolean isActive(); + + /** + * Returns the protocol for the REST request, usually HTTP o HTTPS. + * + * @return protocol + */ + String protocol(); + + /** + * Returns the url for the REST requests, to be used instead of IP and PORT. + * + * @return url + */ + String url(); + + /** + * Returns the proxy state of this device + * (if true, the device is proxying multiple ONOS devices). + * @return proxy state + */ + boolean isProxy(); + + /** + * Returns the url for the REST TEST requests. + * + * @return testUrl + */ + Optional<String> testUrl(); + + /** + * The manufacturer of the rest device. + * + * @return the name of the manufacturer + */ + Optional<String> manufacturer(); + + /** + * The hardware version of the rest device. + * + * @return the hardware version + */ + Optional<String> hwVersion(); + + /** + * The software version of rest device. + * + * @return the software version. + */ + Optional<String> swVersion(); +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java index d6b93f744..bda6854c5 100644 --- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java @@ -20,7 +20,12 @@ package org.onap.ccsdk.sli.plugins.restconfdiscovery; +import com.google.common.collect.ImmutableMap; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.glassfish.jersey.media.sse.EventInput; import org.glassfish.jersey.media.sse.EventSource; +import org.glassfish.jersey.media.sse.InboundEvent; import org.glassfish.jersey.media.sse.SseFeature; import org.onap.ccsdk.sli.core.sli.SvcLogicContext; import org.onap.ccsdk.sli.core.sli.SvcLogicException; @@ -29,6 +34,7 @@ import org.onap.ccsdk.sli.plugins.restapicall.Parameters; import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode; import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -36,40 +42,63 @@ import javax.net.ssl.X509TrustManager; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import java.net.MalformedURLException; +import java.net.URL; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; +import java.util.HashSet; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.*; +import static org.onap.ccsdk.sli.plugins.restapicall.JsonParser.convertToProperties; import static org.slf4j.LoggerFactory.getLogger; /** * Representation of a plugin to subscribe for notification and then * to handle the received notifications. */ -public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { +public class RestconfDiscoveryNode implements RestConfSBController, SvcLogicDiscoveryPlugin { private static final Logger log = getLogger(RestconfDiscoveryNode.class); - private ExecutorService executor = Executors.newCachedThreadPool(); - private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>(); - private RestconfApiCallNode restconfApiCallNode; - - private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>(); - private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>(); - + private static final String ROOT_RESOURCE = "/restconf"; private static final String SUBSCRIBER_ID = "subscriberId"; private static final String RESPONSE_CODE = "response-code"; private static final String RESPONSE_PREFIX = "responsePrefix"; private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" + "ications:establish-subscription.output.identifier"; + private static final String OUTPUT_IDENTIFIER_NO_PREFIX = "output.identifier"; private static final String RESPONSE_CODE_200 = "200"; private static final String SSE_URL = "sseConnectURL"; + private static final String REST_API_URL = "restapiUrl"; + private static final String RESOURCE_PATH_PREFIX = "/data/"; + private static final String NOTIFICATION_PATH_PREFIX = "/streams/"; + private static final String DEVICE_IP = "deviceIp"; + private static final String DEVICE_PORT = "devicePort"; + private static final String DOUBLESLASH = "//"; + private static final String COLON = ":"; + + private RestconfApiCallNode restconfApiCallNode; + private RestapiCallNode restapiCallNode = new RestapiCallNode(); + private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>(); + private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>(); + private Map<DeviceId, Set<RestconfNotificationEventListener>> + restconfNotificationListenerMap = new ConcurrentHashMap<>(); + private Map<DeviceId, GetChunksRunnable> + runnableTable = new ConcurrentHashMap<>(); + private Map<DeviceId, String> subscribedDevicesTable = new ConcurrentHashMap<>(); + private Map<DeviceId, BlockingQueue<String>> eventQMap = new ConcurrentHashMap<>(); + private Map<DeviceId, InternalRestconfEventProcessorRunnable> + processorRunnableTable = new ConcurrentHashMap<>(); + private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>(); + private final Map<DeviceId, RestSBDevice> deviceMap = new ConcurrentHashMap<>(); + private final Map<DeviceId, Client> clientMap = new ConcurrentHashMap<>(); + private ExecutorService executor = Executors.newCachedThreadPool(); /** * Creates an instance of RestconfDiscoveryNode and starts processing of @@ -78,15 +107,92 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { * @param r restconf api call node */ public RestconfDiscoveryNode(RestconfApiCallNode r) { + log.info("inside RestconfDiscoveryNode Constructor"); this.restconfApiCallNode = r; - ExecutorService e = Executors.newFixedThreadPool(20); - EventProcessor p = new EventProcessor(this); - for (int i = 0; i < 20; ++i) { - e.execute(p); + this.activate(); +// ExecutorService e = Executors.newFixedThreadPool(20); +// EventProcessor p = new EventProcessor(this); +// for (int i = 0; i < 20; ++i) { +// e.execute(p); +// } + } + + public void activate() { + log.info("RESTCONF SBI Started"); + } + + public void deactivate() { + log.info("RESTCONF SBI Stopped"); + executor.shutdown(); + this.getClientMap().clear(); + this.getDeviceMap().clear(); + } + + public Map<DeviceId, RestSBDevice> getDeviceMap() { + return deviceMap; + } + + public Map<DeviceId, Client> getClientMap() { + return clientMap; + } + + @Override + public Map<DeviceId, RestSBDevice> getDevices() { + log.trace("RESTCONF SBI::getDevices"); + return ImmutableMap.copyOf(deviceMap); + } + + @Override + public RestSBDevice getDevice(DeviceId deviceInfo) { + log.trace("RESTCONF SBI::getDevice with deviceId"); + return deviceMap.get(deviceInfo); + } + + @Override + public RestSBDevice getDevice(String ip, int port) { + log.trace("RESTCONF SBI::getDevice with ip and port"); + try { + if (!deviceMap.isEmpty()) { + return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get(); + } + } catch (NoSuchElementException noSuchElementException) { + log.error("getDevice::device {}:{} does not exist in deviceMap", ip, port); + } + return null; + } + + @Override + public void addDevice(RestSBDevice device) { + log.trace("RESTCONF SBI::addDevice"); + if (!deviceMap.containsKey(device.deviceId())) { + if (device.username() != null) { + String username = device.username(); + String password = device.password() == null ? "" : device.password(); + // authenticate(client, username, password); + } + BlockingQueue<String> newBlockingQueue = new LinkedBlockingQueue<>(); + eventQMap.put(device.deviceId(), newBlockingQueue); + InternalRestconfEventProcessorRunnable eventProcessorRunnable = + new InternalRestconfEventProcessorRunnable(device.deviceId()); + processorRunnableTable.put(device.deviceId(), eventProcessorRunnable); + log.trace("addDevice::restconf event processor runnable is created and is going for execute"); + executor.execute(eventProcessorRunnable); + log.trace("addDevice::restconf event processor runnable was sent for execute"); + deviceMap.put(device.deviceId(), device); + } else { + log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId()); } } @Override + public void removeDevice(DeviceId deviceId) { + log.trace("RESTCONF SBI::removeDevice"); + eventQMap.remove(deviceId); + clientMap.remove(deviceId); + deviceMap.remove(deviceId); + } + + @Override public void establishSubscription(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException { String subscriberId = paramMap.get(SUBSCRIBER_ID); @@ -107,6 +213,62 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { } @Override + public void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx) + throws SvcLogicException { + String subscriberId = paramMap.get(SUBSCRIBER_ID); + if (subscriberId == null) { + throw new SvcLogicException("Subscriber Id is null"); + } + + String subscribeUrlString = paramMap.get(REST_API_URL); + URL subscribeUrl = null; + RestSBDevice dev = null; + try { + subscribeUrl = new URL(subscribeUrlString); + dev = getDevice(subscribeUrl.getHost(), subscribeUrl.getPort()); + } catch (MalformedURLException e) { + log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e); + return; + } + + if (dev == null) { + log.warn("establishSubscriptionOnly::device does not exist in the map. Trying to create one now."); + //FIXME: TODO: create a new RestSBDevice and add it to the map, as well as a client and clientMap + dev = new DefaultRestSBDevice(subscribeUrl.getHost(), + subscribeUrl.getPort(), "onos", "rocks", "http", + subscribeUrl.getHost() + ":" + subscribeUrl.getPort(), true); + this.addDevice(dev); + } + + if (!subscribedDevicesTable.containsKey(dev.deviceId())) { + log.info("establishSubscriptionOnly::The device {} has not been subscribed yet. " + + "Trying to subscribe it now..."); + restapiCallNode.sendRequest(paramMap, ctx); + if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) { + // TODO: save subscription id and subscriber in MYSQL + String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx); + log.info("establishSubscriptionOnly::Subscription is done successfully and " + + "the output.identifier is: {}", id); + log.info("establishSubscriptionOnly::The subscriptionID returned by the server " + + "does not exist in the map. Adding it now..."); + subscribedDevicesTable.put(dev.deviceId(), id); + + SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"), + paramMap.get("rpc"), + paramMap.get("version"), + paramMap.get("mode")); + SubscriptionInfo info = new SubscriptionInfo(); + info.callBackDG(callbackDG); + info.subscriptionId(id); + info.subscriberId(subscriberId); + subscriptionInfoMap.put(id, info); + + } + } + + } + + @Override public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) { // TODO: to be implemented } @@ -252,6 +414,10 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE); } + String getOutputIdentifierNoPrefix(String prefix, SvcLogicContext ctx) { + return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER_NO_PREFIX); + } + /** * Returns subscription id from event. * @@ -278,6 +444,34 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { return null; } + private String getUrlString(DeviceId deviceId, String request) { + RestSBDevice restSBDevice = deviceMap.get(deviceId); + if (restSBDevice == null) { + log.warn("getUrlString::restSbDevice cannot be NULL!"); + return ""; + } + if (restSBDevice.url() != null) { + return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.url() + request; + } else { + return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.ip().toString() + + COLON + restSBDevice.port() + request; + } + } + + private String getSubscriptionIdFromDeviceId(DeviceId deviceId) { + if (subscribedDevicesTable.containsKey(deviceId)) { + return subscribedDevicesTable.get(deviceId); + } + return null; + } + + private BlockingQueue<String> getEventQ(DeviceId deviceId) { + if (eventQMap.containsKey(deviceId)) { + return eventQMap.get(deviceId); + } + return null; + } + /** * Returns restconfApiCallNode. * @@ -311,4 +505,316 @@ public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin { void eventQueue(LinkedBlockingQueue<String> eventQueue) { this.eventQueue = eventQueue; } + + /** + * Establishes a persistent SSE connection between the client and the server. + * + * @param paramMap input paramter map + * @param ctx service logic context + */ + @Override + public void establishPersistentSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException { + + //TODO: FIXME: remove the instantiation of info; not useful + String subscriberId = paramMap.get(SUBSCRIBER_ID); + SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"), + paramMap.get("rpc"), + paramMap.get("version"), + paramMap.get("mode")); + SubscriptionInfo info = new SubscriptionInfo(); + info.callBackDG(callbackDG); + info.subscriberId(subscriberId); + + String sseUrlString = paramMap.get(SSE_URL); + URL sseUrl = null; + RestSBDevice dev = null; + try { + sseUrl = new URL(sseUrlString); + dev = getDevice(sseUrl.getHost(), sseUrl.getPort()); + } catch (MalformedURLException e) { + log.error("establishPersistentSseConnection::MalformedURLException happened. e: {}", e); + return; + } + + if (dev == null) { + log.warn("establishPersistentSseConnection::device does not exist in the map. Trying to add one now."); + dev = new DefaultRestSBDevice(sseUrl.getHost(), + sseUrl.getPort(), "onos", "rocks", "http", + sseUrl.getHost() + ":" + sseUrl.getPort(), true); + this.addDevice(dev); + } + + if (isNotificationEnabled(dev.deviceId())) { + log.warn("establishPersistentSseConnection::notifications already enabled on device: {}", + dev.deviceId()); + return; + } + + if (getSubscriptionIdFromDeviceId(dev.deviceId()) == null) { + log.warn("This device {} has not yet been subscribed to receive notifications.", + dev.deviceId()); + return; + } + + RestconfNotificationEventListenerImpl myListener = + new RestconfNotificationEventListenerImpl(info); + enableNotifications(dev.deviceId(), "yang-push-json", "json", myListener); + } + + @Override + public void enableNotifications(DeviceId device, String request, + String mediaType, + RestconfNotificationEventListener listener) { + if (isNotificationEnabled(device)) { + log.warn("enableNotifications::already enabled on device: {}", device); + return; + } + + request = discoverRootResource(device) + NOTIFICATION_PATH_PREFIX + + request; + + addNotificationListener(device, listener); + + GetChunksRunnable runnable = new GetChunksRunnable(request, mediaType, + device); + runnableTable.put(device, runnable); + executor.execute(runnable); + } + + public void stopNotifications(DeviceId device) { + try { + runnableTable.get(device).terminate(); + processorRunnableTable.get(device).terminate(); + } catch (Exception ex) { + log.error("stopNotifications::Exception happened when terminating, ex: {}", ex); + } + log.info("stopNotifications::Runnable is now terminated"); + runnableTable.remove(device); + processorRunnableTable.remove(device); + restconfNotificationListenerMap.remove(device); + log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString()); + } + + @Override + public void deleteSubscriptionAndSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) { + String deleteSubscribeUrlString = paramMap.get(REST_API_URL); + URL deleteSubscribeUrl = null; + RestSBDevice dev = null; + try { + deleteSubscribeUrl = new URL(deleteSubscribeUrlString); + dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort()); + } catch (MalformedURLException e) { + log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e); + return; + } + + String deviceIp = deleteSubscribeUrl.getHost(); + String devicePort = String.valueOf(deleteSubscribeUrl.getPort()); + log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}", + deviceIp, devicePort); + if (dev == null) { + log.error("deleteSubscriptionAndSseConnection::device does not exist in the map"); + return; + } + String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId()); + + if (subscriptionId != null) { + log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId); + log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request"); + try { + restapiCallNode.sendRequest(paramMap, ctx); + if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) { + log.info("deleteSubscriptionAndSseConnection::Successfully unsubscribed"); + stopNotifications(dev.deviceId()); + subscribedDevicesTable.remove(dev.deviceId()); + + String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID)); + if (id != null) { + subscriptionInfoMap.remove(id); + } + + } else { + log.info("deleteSubscriptionAndSseConnection::Unsubscription was NOT successfull"); + } + } catch (SvcLogicException e) { + log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e); + } + } else { + log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed"); + } + } + + /** + * Notifies providers about incoming RESTCONF notification events. + */public class GetChunksRunnable implements Runnable { + private String request; + private String mediaType; + private DeviceId deviceId; + + private volatile boolean running = true; + + public void terminate() { + log.info("GetChunksRunnable.terminate()::threadID: {}", + Thread.currentThread().getId()); + running = false; + } + + /** + * @param request request + * @param mediaType media type + * @param deviceId device identifier + */ + public GetChunksRunnable(String request, String mediaType, + DeviceId deviceId) { + this.request = request; + this.mediaType = mediaType; + this.deviceId = deviceId; + } + + @Override + public void run() { + log.trace("GetChunksRunnable.run()::threadID is: {} ...., running is: {}", + Thread.currentThread().getId(), running); + try { + Client client = ClientBuilder.newBuilder() + .register(SseFeature.class).build(); + WebTarget target = client.target(getUrlString(deviceId, request)); + log.trace("GetChunksRunnable.run()::target URI is {}", target.getUri().toString()); + Response response = target.request().get(); + EventInput eventInput = response.readEntity(EventInput.class); + log.trace("GetChunksRunnable.run()::after eventInput"); + String rcvdData = ""; + while (!eventInput.isClosed() && running) { + log.trace("GetChunksRunnable.run()::inside while ..."); + final InboundEvent inboundEvent = eventInput.read(); + log.trace("GetChunksRunnable.run()::after eventInput.read() ..."); + if (inboundEvent == null) { + // connection has been closed + log.info("GetChunksRunnable.run()::connection has been closed ..."); + break; + } + if (running) { + rcvdData = inboundEvent.readData(String.class); + BlockingQueue<String> eventQ = getEventQ(deviceId); + if (eventQ != null) { + eventQ.add(rcvdData); + eventQMap.put(deviceId, eventQ); + log.trace("GetChunksRunnable.run()::eventQ got filled."); + } else { + log.error("GetChunksRunnable.run()::eventQ has not been initialized for this device {}", + deviceId); + } + } else { + log.info("GetChunksRunnable.run()::running has changed to false while eventInput.read() " + + "was blocked to receive new notifications"); + log.info("GetChunksRunnable.run()::the client is no longer interested to " + + "receive notifications."); + break; + } + } + if (!running) { + log.trace("GetChunksRunnable.run()::running is false! " + + "closing eventInput, threadID: {}", Thread.currentThread().getId()); + eventInput.close(); + response.close(); + client.close(); + log.info("GetChunksRunnable.run()::eventInput is closed in run()"); + } + } catch (Exception ex) { + log.info("GetChunksRunnable.run()::We got some exception: {}, threadID: {} ", ex, + Thread.currentThread().getId()); + } + log.trace("GetChunksRunnable.run()::after Runnable Try Catch. threadID: {} ", + Thread.currentThread().getId()); + } + } + + public class InternalRestconfEventProcessorRunnable implements Runnable { + + private volatile boolean running = true; + private DeviceId deviceId; + + public InternalRestconfEventProcessorRunnable(DeviceId deviceId) { + this.deviceId = deviceId; + } + + public void terminate() { + log.info("InternalRestconfEventProcessorRunnable.terminate()::threadID: {}", + Thread.currentThread().getId()); + running = false; + } + + @Override + public void run() { + log.trace("InternalRestconfEventProcessorRunnable::restconf event processor runnable inside run()"); + while (running) { + try { + if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) { + log.trace("InternalRestconfEventProcessorRunnable::waiting for take()"); + if (running) { + String eventJsonString = eventQMap.get(deviceId).take(); + log.trace("InternalRestconfEventProcessorRunnable::after take()"); + log.info("InternalRestconfEventProcessorRunnable::eventJsonString is {}", eventJsonString); + Map<String, String> param = convertToProperties(eventJsonString); + String idString = param.get("push-change-update.subscription-id"); + SubscriptionInfo info = subscriptionInfoMap().get(idString); + if (info != null) { + SvcLogicContext ctx = setContext(param); + SvcLogicGraphInfo callbackDG = info.callBackDG(); + callbackDG.executeGraph(ctx); + } + } else { + log.info("InternalRestconfEventProcessorRunnable.run()::running has changed to false " + + "while eventQ was blocked to process new notifications"); + log.info("InternalRestconfEventProcessorRunnable.run()::" + + "the client is no longer interested to receive notifications."); + break; + } + } + } catch (InterruptedException | SvcLogicException e) { + e.printStackTrace(); + } + } + } + private SvcLogicContext setContext(Map<String, String> param) { + SvcLogicContext ctx = new SvcLogicContext(); + for (Map.Entry<String, String> entry : param.entrySet()) { + ctx.setAttribute(entry.getKey(), entry.getValue()); + } + return ctx; + } + } + + public String discoverRootResource(DeviceId device) { + return ROOT_RESOURCE; + } + + @Override + public void addNotificationListener(DeviceId deviceId, + RestconfNotificationEventListener listener) { + Set<RestconfNotificationEventListener> listeners = + restconfNotificationListenerMap.get(deviceId); + if (listeners == null) { + listeners = new HashSet<>(); + } + + listeners.add(listener); + + this.restconfNotificationListenerMap.put(deviceId, listeners); + } + + @Override + public void removeNotificationListener(DeviceId deviceId, + RestconfNotificationEventListener listener) { + Set<RestconfNotificationEventListener> listeners = + restconfNotificationListenerMap.get(deviceId); + if (listeners != null) { + listeners.remove(listener); + } + } + + public boolean isNotificationEnabled(DeviceId deviceId) { + return runnableTable.containsKey(deviceId); + } + } diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java new file mode 100644 index 000000000..292e17e46 --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListener.java @@ -0,0 +1,15 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +/** + * Notifies providers about incoming RESTCONF notification events. + */ +public interface RestconfNotificationEventListener<T> { + + /** + * Handles the notification event. + * + * @param deviceId restconf device identifier + * @param event event payload + */ + void handleNotificationEvent(DeviceId deviceId, T event); +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java new file mode 100644 index 000000000..fa2fa02d2 --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfNotificationEventListenerImpl.java @@ -0,0 +1,22 @@ +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; + +public class RestconfNotificationEventListenerImpl implements + RestconfNotificationEventListener<String> { + + private final Logger log = getLogger(getClass()); + SubscriptionInfo info; + + public RestconfNotificationEventListenerImpl(SubscriptionInfo info) { + this.info = info; + } + + @Override + public void handleNotificationEvent(DeviceId deviceId, String eventJsonString) { + log.info("New notification: {} for device: {}", + eventJsonString, deviceId.toString()); + } +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java index dfe8cd5b7..972fb2b55 100644 --- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java @@ -62,6 +62,68 @@ public interface SvcLogicDiscoveryPlugin extends SvcLogicJavaPlugin { void establishSubscription(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException; /** + * Allows directed graphs to subscribe to a restconf server to receive notifications from that server. + * @param paramMap HashMap<String,String> of parameters passed by the DG to this function + * <table border="1"> + * <thead><th>parameter</th><th>Mandatory/Optional</th><th>description</th><th>example values</th></thead> + * <tbody> + * <tr><td>templateDirName</td><td>Optional</td><td>full path to YANG directory that can be used to build a request</td><td>/sdncopt/bvc/resconfapi/test</td></tr> + * <tr><td>establishSubscriptionURL</td><td>Mandatory</td><td>url to establish connection with server</td><td>https://127.0.0.1:8181/restconf/operations/ietf-subscribed-notifications:establish-subscription</td></tr> + * <tr><td>sseConnectURL</td><td>Mandatory</td><td>url to setup SSE connection with server</td><td>https://127.0.0.1:8181/restconf/streams/yang-push-json</td></tr> + * <tr><td>callbackDG</td><td>Mandatory</td><td>callback DG to process the received notification</td><td>Resource-Discovery:handleSOTNTopology</td></tr> + * <tr><td>filterURL</td><td>Optional</td><td>url which needs to be subscribed, if null subscribe to all</td><td>http://example.com/sample-data/1.0</td></tr> + * <tr><td>subscriptionType</td><td>Optional</td><td>type of subscription, periodic or onDataChange</td><td>onDataChange</td></tr> + * <tr><td>updateFrequency</td><td>Optional</td><td>update frequency in milli seconds when subscription type is periodic</td><td>1000</td></tr> + * <tr><td>restapiUser</td><td>Optional</td><td>user name to use for http basic authentication</td><td>sdnc_ws</td></tr> + * <tr><td>restapiPassword</td><td>Optional</td><td>unencrypted password to use for http basic authentication</td><td>plain_password</td></tr> + * <tr><td>contentType</td><td>Optional</td><td>http content type to set in the http header</td><td>usually application/json or application/xml</td></tr> + * <tr><td>format</td><td>Optional</td><td>should match request body format</td><td>json or xml</td></tr> + * <tr><td>responsePrefix</td><td>Optional</td><td>location the notification response will be written to in context memory</td><td>tmp.restconfdiscovery.result</td></tr> + * <tr><td>skipSending</td><td>Optional</td><td></td><td>true or false</td></tr> + * <tr><td>convertResponse </td><td>Optional</td><td>whether the response should be converted</td><td>true or false</td></tr> + * <tr><td>customHttpHeaders</td><td>Optional</td><td>a list additional http headers to be passed in, follow the format in the example</td><td>X-CSI-MessageId=messageId,headerFieldName=headerFieldValue</td></tr> + * <tr><td>dumpHeaders</td><td>Optional</td><td>when true writes http header content to context memory</td><td>true or false</td></tr> + * </tbody> + * </table> + * @param ctx Reference to context memory + * @throws SvcLogicException + * @since 11.0.2 + * @see String#split(String, int) + */ + void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException; + + /** + * Allows directed graphs to establish a discovery subscription for a given subscriber. + * @param paramMap HashMap<String,String> of parameters passed by the DG to this function + * <table border="1"> + * <thead><th>parameter</th><th>Mandatory/Optional</th><th>description</th><th>example values</th></thead> + * <tbody> + * <tr><td>templateDirName</td><td>Optional</td><td>full path to YANG directory that can be used to build a request</td><td>/sdncopt/bvc/resconfapi/test</td></tr> + * <tr><td>establishSubscriptionURL</td><td>Mandatory</td><td>url to establish connection with server</td><td>https://127.0.0.1:8181/restconf/operations/ietf-subscribed-notifications:establish-subscription</td></tr> + * <tr><td>sseConnectURL</td><td>Mandatory</td><td>url to setup SSE connection with server</td><td>https://127.0.0.1:8181/restconf/streams/yang-push-json</td></tr> + * <tr><td>callbackDG</td><td>Mandatory</td><td>callback DG to process the received notification</td><td>Resource-Discovery:handleSOTNTopology</td></tr> + * <tr><td>filterURL</td><td>Optional</td><td>url which needs to be subscribed, if null subscribe to all</td><td>http://example.com/sample-data/1.0</td></tr> + * <tr><td>subscriptionType</td><td>Optional</td><td>type of subscription, periodic or onDataChange</td><td>onDataChange</td></tr> + * <tr><td>updateFrequency</td><td>Optional</td><td>update frequency in milli seconds when subscription type is periodic</td><td>1000</td></tr> + * <tr><td>restapiUser</td><td>Optional</td><td>user name to use for http basic authentication</td><td>sdnc_ws</td></tr> + * <tr><td>restapiPassword</td><td>Optional</td><td>unencrypted password to use for http basic authentication</td><td>plain_password</td></tr> + * <tr><td>contentType</td><td>Optional</td><td>http content type to set in the http header</td><td>usually application/json or application/xml</td></tr> + * <tr><td>format</td><td>Optional</td><td>should match request body format</td><td>json or xml</td></tr> + * <tr><td>responsePrefix</td><td>Optional</td><td>location the notification response will be written to in context memory</td><td>tmp.restconfdiscovery.result</td></tr> + * <tr><td>skipSending</td><td>Optional</td><td></td><td>true or false</td></tr> + * <tr><td>convertResponse </td><td>Optional</td><td>whether the response should be converted</td><td>true or false</td></tr> + * <tr><td>customHttpHeaders</td><td>Optional</td><td>a list additional http headers to be passed in, follow the format in the example</td><td>X-CSI-MessageId=messageId,headerFieldName=headerFieldValue</td></tr> + * <tr><td>dumpHeaders</td><td>Optional</td><td>when true writes http header content to context memory</td><td>true or false</td></tr> + * </tbody> + * </table> + * @param ctx Reference to context memory + * @throws SvcLogicException + * @since 11.0.2 + * @see String#split(String, int) + */ + void establishPersistentSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException; + + /** * Allows directed graphs to modify a discovery subscription for a given subscriber. * @param paramMap HashMap<String,String> of parameters passed by the DG to this function * <table border="1"> @@ -107,4 +169,18 @@ public interface SvcLogicDiscoveryPlugin extends SvcLogicJavaPlugin { */ void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx); + /** + * Allows directed graphs to unsubscribe from a restconf server and to remove the persistent sse connection. + * @param paramMap HashMap<String,String> of parameters passed by the DG to this function + * <table border="1"> + * <thead><th>parameter</th><th>Mandatory/Optional</th><th>description</th><th>example values</th></thead> + * <tbody> + * <tr><td>subscriberId</td><td>Mandatory</td><td>subscription subscriber's identifier</td><td>topologyId/1111</td></tr> + * </tbody> + * </table> + * @param ctx Reference to context memory + * @throws SvcLogicException + */ + void deleteSubscriptionAndSseConnection(Map<String, String> paramMap, SvcLogicContext ctx); + } |