From cc105abcb5a70b4418241ffdabc9ec9fc7546293 Mon Sep 17 00:00:00 2001 From: Hesam Rahimi Date: Fri, 2 Sep 2022 16:41:09 -0400 Subject: Implementing a mechanism to support periodic pulling of Performance Monitoring (PM) data from 3rd party controllers. Issue-ID: CCSDK-3752 Signed-off-by: Hesam Rahimi Change-Id: I6ee692d1ab370bf9ad3f2f88db63efc2a124b87c --- .../restconfdiscovery/PeriodicDiscoveryNode.java | 681 +++++++++++++++++++++ .../restconfdiscovery/RestconfDiscoveryNode.java | 5 + .../restconfdiscovery/SvcLogicDiscoveryPlugin.java | 30 + 3 files changed, 716 insertions(+) create mode 100644 plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java (limited to 'plugins/restconf-client/provider/src/main/java') diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java new file mode 100644 index 000000000..220de8790 --- /dev/null +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java @@ -0,0 +1,681 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - CCSDK + * ================================================================================ + * Copyright (C) 2018 Huawei Technologies Co., Ltd. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.plugins.restconfdiscovery; + +import com.google.common.collect.ImmutableMap; +import org.glassfish.jersey.media.sse.EventInput; +import org.glassfish.jersey.media.sse.EventSource; +import org.glassfish.jersey.media.sse.InboundEvent; +import org.glassfish.jersey.media.sse.SseFeature; +import org.onap.ccsdk.sli.core.sli.SvcLogicContext; +import org.onap.ccsdk.sli.core.sli.SvcLogicException; +import org.onap.ccsdk.sli.core.utils.common.AcceptIpAddressHostNameVerifier; +import org.onap.ccsdk.sli.plugins.restapicall.Parameters; +import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode; +import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode; +import org.slf4j.Logger; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.*; + +import static org.onap.ccsdk.sli.plugins.restapicall.JsonParser.convertToProperties; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Representation of a plugin to subscribe for notification and then + * to handle the received notifications. + */ +public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDiscoveryPlugin { + + private static final Logger log = getLogger(PeriodicDiscoveryNode.class); + + private static final String ROOT_RESOURCE = "/restconf"; + private static final String SUBSCRIBER_ID = "subscriberId"; + private static final String RESPONSE_CODE = "response-code"; + private static final String RESPONSE_PREFIX = "responsePrefix"; + private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" + + "ications:establish-subscription.output.identifier"; + private static final String OUTPUT_IDENTIFIER_NO_PREFIX = "output.identifier"; + private static final String RESPONSE_CODE_200 = "200"; + private static final String SSE_URL = "sseConnectURL"; + private static final String PERIODIC_PUL_URL = "periodicPullURL"; + private static final String REST_API_URL = "restapiUrl"; + private static final String RESOURCE_PATH_PREFIX = "/data/"; + private static final String NOTIFICATION_PATH_PREFIX = "/streams/"; + private static final String DEVICE_IP = "deviceIp"; + private static final String DEVICE_PORT = "devicePort"; + private static final String DOUBLESLASH = "//"; + private static final String COLON = ":"; + + private RestconfApiCallNode restconfApiCallNode; + private RestapiCallNode restapiCallNode = new RestapiCallNode(); + private volatile Map subscriptionInfoMap = new ConcurrentHashMap<>(); + private volatile LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); + private Map periodicRunnableTable = new ConcurrentHashMap<>(); + private Map subscribedDevicesTable = new ConcurrentHashMap<>(); + private Map> eventQMap = new ConcurrentHashMap<>(); + private Map + processorRunnableTable = new ConcurrentHashMap<>(); + private final Map deviceMap = new ConcurrentHashMap<>(); + private final Map clientMap = new ConcurrentHashMap<>(); + private ExecutorService executor = Executors.newCachedThreadPool(); + private ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2); + + /** + * Creates an instance of RestconfDiscoveryNode and starts processing of + * event. + * + * @param r restconf api call node + */ + public PeriodicDiscoveryNode(RestconfApiCallNode r) { + log.info("inside RestconfDiscoveryNode Constructor"); + this.restconfApiCallNode = r; + this.activate(); +// ExecutorService e = Executors.newFixedThreadPool(20); +// EventProcessor p = new EventProcessor(this); +// for (int i = 0; i < 20; ++i) { +// e.execute(p); +// } + } + + public void activate() { + log.info("RESTCONF SBI Started"); + } + + public void deactivate() { + log.info("RESTCONF SBI Stopped"); + executor.shutdown(); + this.getClientMap().clear(); + this.getDeviceMap().clear(); + } + + public Map getDeviceMap() { + return deviceMap; + } + + public Map getClientMap() { + return clientMap; + } + + @Override + public Map getDevices() { + log.trace("RESTCONF SBI::getDevices"); + return ImmutableMap.copyOf(deviceMap); + } + + @Override + public RestSBDevice getDevice(DeviceId deviceInfo) { + log.trace("RESTCONF SBI::getDevice with deviceId"); + return deviceMap.get(deviceInfo); + } + + @Override + public RestSBDevice getDevice(String ip, int port) { + log.trace("RESTCONF SBI::getDevice with ip and port"); + try { + if (!deviceMap.isEmpty()) { + return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get(); + } + } catch (NoSuchElementException noSuchElementException) { + log.error("getDevice::device {}:{} does not exist in deviceMap", ip, port); + } + return null; + } + + @Override + public void addDevice(RestSBDevice device) { + log.trace("RESTCONF SBI::addDevice"); + if (!deviceMap.containsKey(device.deviceId())) { + if (device.username() != null) { + String username = device.username(); + String password = device.password() == null ? "" : device.password(); + // authenticate(client, username, password); + } + BlockingQueue newBlockingQueue = new LinkedBlockingQueue<>(); + eventQMap.put(device.deviceId(), newBlockingQueue); + InternalPeriodicPullingProcessorRunnable eventProcessorRunnable = + new InternalPeriodicPullingProcessorRunnable(device.deviceId()); + processorRunnableTable.put(device.deviceId(), eventProcessorRunnable); + log.trace("addDevice::restconf event processor runnable is created and is going for execute"); + executor.execute(eventProcessorRunnable); + log.trace("addDevice::restconf event processor runnable was sent for execute"); + deviceMap.put(device.deviceId(), device); + } else { + log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId()); + } + } + + @Override + public void removeDevice(DeviceId deviceId) { + log.trace("RESTCONF SBI::removeDevice"); + eventQMap.remove(deviceId); + clientMap.remove(deviceId); + deviceMap.remove(deviceId); + } + + @Override + public void establishSubscription(Map paramMap, + SvcLogicContext ctx) throws SvcLogicException { + String subscriberId = paramMap.get(SUBSCRIBER_ID); + if (subscriberId == null) { + throw new SvcLogicException("Subscriber Id is null"); + } + + restconfApiCallNode.sendRequest(paramMap, ctx); + + if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) { + // TODO: save subscription id and subscriber in MYSQL + + establishPersistentConnection(paramMap, ctx, subscriberId); + } else { + log.info("Failed to subscribe {}", subscriberId); + throw new SvcLogicException(ctx.getAttribute(RESPONSE_CODE)); + } + } + + @Override + public void establishSubscriptionOnly(Map paramMap, SvcLogicContext ctx) + throws SvcLogicException { + String subscriberId = paramMap.get(SUBSCRIBER_ID); + if (subscriberId == null) { + throw new SvcLogicException("Subscriber Id is null"); + } + + String subscribeUrlString = paramMap.get(REST_API_URL); + URL subscribeUrl = null; + RestSBDevice dev = null; + try { + subscribeUrl = new URL(subscribeUrlString); + dev = getDevice(subscribeUrl.getHost(), subscribeUrl.getPort()); + } catch (MalformedURLException e) { + log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e); + return; + } + + if (dev == null) { + log.warn("establishSubscriptionOnly::device does not exist in the map. Trying to create one now."); + //FIXME: TODO: create a new RestSBDevice and add it to the map, as well as a client and clientMap + dev = new DefaultRestSBDevice(subscribeUrl.getHost(), + subscribeUrl.getPort(), "onos", "rocks", "http", + subscribeUrl.getHost() + ":" + subscribeUrl.getPort(), true); + this.addDevice(dev); + } + + if (!subscribedDevicesTable.containsKey(dev.deviceId())) { + log.info("establishSubscriptionOnly::The device {} has not been subscribed yet. " + + "Trying to subscribe it now..."); + restapiCallNode.sendRequest(paramMap, ctx); + if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) { + // TODO: save subscription id and subscriber in MYSQL + String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx); + log.info("establishSubscriptionOnly::Subscription is done successfully and " + + "the output.identifier is: {}", id); + log.info("establishSubscriptionOnly::The subscriptionID returned by the server " + + "does not exist in the map. Adding it now..."); + subscribedDevicesTable.put(dev.deviceId(), id); + + SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"), + paramMap.get("rpc"), + paramMap.get("version"), + paramMap.get("mode")); + SubscriptionInfo info = new SubscriptionInfo(); + info.callBackDG(callbackDG); + info.subscriptionId(id); + info.subscriberId(subscriberId); + subscriptionInfoMap.put(id, info); + + } + } + + } + + @Override + public void modifySubscription(Map paramMap, SvcLogicContext ctx) { + // TODO: to be implemented + } + + @Override + public void deleteSubscription(Map paramMap, SvcLogicContext ctx) { + String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID)); + if (id != null) { + subscriptionInfoMap.remove(id); + } + } + + protected String getTokenId(String customHttpHeaders) { + if (customHttpHeaders.contains("=")) { + String[] s = customHttpHeaders.split("="); + return s[1]; + } + return customHttpHeaders; + } + + protected WebTarget addToken(WebTarget target, String customHttpHeaders) { + if (customHttpHeaders == null) { + return target; + } + + return new AdditionalHeaderWebTarget( + target, getTokenId(customHttpHeaders)); + } + + /** + * Establishes a persistent between the client and server. + * + * @param paramMap input paramter map + * @param ctx service logic context + * @param subscriberId subscriber identifier + */ + void establishPersistentConnection(Map paramMap, SvcLogicContext ctx, + String subscriberId) { + } + + /** + * Returns response code. + * + * @param prefix prefix given in input parameter + * @param ctx service logic context + * @return response code + */ + String getResponseCode(String prefix, SvcLogicContext ctx) { + return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE); + } + + String getOutputIdentifierNoPrefix(String prefix, SvcLogicContext ctx) { + return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER_NO_PREFIX); + } + + /** + * Returns subscription id from event. + * + * @param prefix prefix given in input parameter + * @param ctx service logic context + * @return subscription id from event + */ + String getOutputIdentifier(String prefix, SvcLogicContext ctx) { + return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER); + } + + private String getPrefix(String prefix) { + return prefix != null ? prefix + "." : ""; + } + + private String getSubscriptionId(String subscriberId) { + for (Map.Entry entry + : subscriptionInfoMap.entrySet()) { + if (entry.getValue().subscriberId() + .equals(subscriberId)) { + return entry.getKey(); + } + } + return null; + } + + private String getUrlString(DeviceId deviceId, String request) { + RestSBDevice restSBDevice = deviceMap.get(deviceId); + if (restSBDevice == null) { + log.warn("getUrlString::restSbDevice cannot be NULL!"); + return ""; + } + if (restSBDevice.url() != null) { + return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.url() + request; + } else { + return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.ip().toString() + + COLON + restSBDevice.port() + request; + } + } + + private String getSubscriptionIdFromDeviceId(DeviceId deviceId) { + if (subscribedDevicesTable.containsKey(deviceId)) { + return subscribedDevicesTable.get(deviceId); + } + return null; + } + + private BlockingQueue getEventQ(DeviceId deviceId) { + if (eventQMap.containsKey(deviceId)) { + return eventQMap.get(deviceId); + } + return null; + } + + /** + * Returns restconfApiCallNode. + * + * @return restconfApiCallNode + */ + protected RestconfApiCallNode restconfapiCallNode() { + return restconfApiCallNode; + } + + /** + * Sets restconfApiCallNode. + * + * @param node restconfApiCallNode + */ + void restconfapiCallNode(RestconfApiCallNode node) { + restconfApiCallNode = node; + } + + Map subscriptionInfoMap() { + return subscriptionInfoMap; + } + + void subscriptionInfoMap(Map subscriptionInfoMap) { + this.subscriptionInfoMap = subscriptionInfoMap; + } + + LinkedBlockingQueue eventQueue() { + return eventQueue; + } + + void eventQueue(LinkedBlockingQueue eventQueue) { + this.eventQueue = eventQueue; + } + + /** + * Establishes a persistent SSE connection between the client and the server. + * + * @param paramMap input paramter map + * @param ctx service logic context + */ + @Override + public void establishPersistentSseConnection(Map paramMap, SvcLogicContext ctx) throws SvcLogicException { + + } + + @Override + public void establishPeriodicPullConnection(Map paramMap, SvcLogicContext ctx) throws SvcLogicException { + String subscriberId = paramMap.get(SUBSCRIBER_ID); + SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"), + paramMap.get("rpc"), + paramMap.get("version"), + paramMap.get("mode")); + SubscriptionInfo info = new SubscriptionInfo(); + info.callBackDG(callbackDG); + info.subscriberId(subscriberId); + + String periodicPullUrlString = paramMap.get(PERIODIC_PUL_URL); + URL periodicPullUrl = null; + RestSBDevice dev = null; + try { + periodicPullUrl = new URL(periodicPullUrlString); + dev = getDevice(periodicPullUrl.getHost(), periodicPullUrl.getPort()); + } catch (MalformedURLException e) { + log.error("establishPersistentSseConnection::MalformedURLException happened. e: {}", e); + return; + } + + if (dev == null) { + log.warn("establishPeriodicPullConnection::device does not exist in the map. Trying to add one now."); + dev = new DefaultRestSBDevice(periodicPullUrl.getHost(), + periodicPullUrl.getPort(), "onos", "rocks", "http", + periodicPullUrl.getHost() + ":" + periodicPullUrl.getPort(), true); + this.addDevice(dev); + } + + if (isNotificationEnabled(dev.deviceId())) { + log.warn("establishPeriodicPullConnection::notifications already enabled on device: {}", + dev.deviceId()); + return; + } + + if (getSubscriptionIdFromDeviceId(dev.deviceId()) == null) { + log.warn("This device {} has not yet been subscribed to receive notifications.", + dev.deviceId()); + return; + } + + RestconfNotificationEventListenerImpl myListener = + new RestconfNotificationEventListenerImpl(info); + enableNotifications(dev.deviceId(), "ietf-service-pm:performance-monitoring", "json", myListener); + } + + @Override + public void enableNotifications(DeviceId device, String request, + String mediaType, + RestconfNotificationEventListener listener) { + if (isNotificationEnabled(device)) { + log.warn("enableNotifications::already enabled on device: {}", device); + return; + } + + request = discoverRootResource(device) + RESOURCE_PATH_PREFIX + + request; + + addNotificationListener(device, listener); + + PeriodicPullRunnable periodicRunnable = new PeriodicPullRunnable(request, device); + periodicRunnableTable.put(device, periodicRunnable); + scheduledExecutor.scheduleAtFixedRate(periodicRunnable, 0, 60, TimeUnit.SECONDS); + } + + public void stopNotifications(DeviceId device) { + try { + periodicRunnableTable.get(device).terminate(); + processorRunnableTable.get(device).terminate(); + } catch (Exception ex) { + log.error("stopNotifications::Exception happened when terminating, ex: {}", ex); + } + log.info("stopNotifications::Runnable is now terminated"); + periodicRunnableTable.remove(device); + processorRunnableTable.remove(device); + log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString()); + } + + @Override + public void deleteSubscriptionAndSseConnection(Map paramMap, SvcLogicContext ctx) { + String deleteSubscribeUrlString = paramMap.get(REST_API_URL); + URL deleteSubscribeUrl = null; + RestSBDevice dev = null; + try { + deleteSubscribeUrl = new URL(deleteSubscribeUrlString); + dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort()); + } catch (MalformedURLException e) { + log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e); + return; + } + + String deviceIp = deleteSubscribeUrl.getHost(); + String devicePort = String.valueOf(deleteSubscribeUrl.getPort()); + log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}", + deviceIp, devicePort); + if (dev == null) { + log.error("deleteSubscriptionAndSseConnection::device does not exist in the map"); + return; + } + String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId()); + + if (subscriptionId != null) { + log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId); + log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request"); + try { + ctx.setAttribute("subscriptionId", subscriptionId); + restapiCallNode.sendRequest(paramMap, ctx); + if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) { + log.info("deleteSubscriptionAndSseConnection::Successfully unsubscribed"); + stopNotifications(dev.deviceId()); + subscribedDevicesTable.remove(dev.deviceId()); + + String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID)); + if (id != null) { + subscriptionInfoMap.remove(id); + } + + } else { + log.info("deleteSubscriptionAndSseConnection::Unsubscription was NOT successfull"); + } + } catch (SvcLogicException e) { + log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e); + } + } else { + log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed"); + } + } + + public class PeriodicPullRunnable implements Runnable { + private String request; + private DeviceId deviceId; + + private volatile boolean running = true; + + public void terminate() { + log.info("PeriodicPullRunnable.terminate()::threadID: {}", + Thread.currentThread().getId()); + running = false; + } + + /** + * @param request request + * @param deviceId device identifier + */ + public PeriodicPullRunnable(String request, DeviceId deviceId) { + this.request = request; + this.deviceId = deviceId; + } + + @Override + public void run() { + log.trace("PeriodicPullRunnable.run()::threadID is: {} ...., running is: {}", + Thread.currentThread().getId(), running); + try { + Client client = ClientBuilder.newBuilder().build(); + WebTarget target = client.target(getUrlString(deviceId, request)); + log.trace("PeriodicPullRunnable.run()::target URI is {}", target.getUri().toString()); + Response response = null; + if (running) { + response = target.request().get(); + String rcvdData = response.readEntity(String.class); + log.trace("PeriodicPullRunnable.run()::after readEntity"); + BlockingQueue eventQ = getEventQ(deviceId); + if (eventQ != null) { + eventQ.add(rcvdData); + eventQMap.put(deviceId, eventQ); + log.trace("PeriodicPullRunnable.run()::eventQ got filled."); + } else { + log.error("PeriodicPullRunnable.run()::eventQ has not been initialized for this device {}", + deviceId); + } + } else { + log.trace("PeriodicPullRunnable.run()::running is false! " + + "closing the client and the response, threadID: {}", Thread.currentThread().getId()); + response.close(); + client.close(); + log.info("PeriodicPullRunnable.run()::eventInput is closed in run()"); + } + } catch (Exception ex) { + log.info("PeriodicPullRunnable.run()::We got some exception: {}, threadID: {} ", ex, + Thread.currentThread().getId()); + } + log.trace("PeriodicPullRunnable.run()::after Runnable Try Catch. threadID: {} ", + Thread.currentThread().getId()); + } + } + + public class InternalPeriodicPullingProcessorRunnable implements Runnable { + + private volatile boolean running = true; + private DeviceId deviceId; + + public InternalPeriodicPullingProcessorRunnable(DeviceId deviceId) { + this.deviceId = deviceId; + } + + public void terminate() { + log.info("InternalPeriodicPullingProcessorRunnable.terminate()::threadID: {}", + Thread.currentThread().getId()); + running = false; + } + + @Override + public void run() { + log.trace("InternalPeriodicPullingProcessorRunnable::restconf event processor runnable inside run()"); + while (running) { + try { + if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) { + log.trace("InternalPeriodicPullingProcessorRunnable::waiting for take()"); + if (running) { + String eventJsonString = eventQMap.get(deviceId).take(); + log.trace("InternalPeriodicPullingProcessorRunnable::after take()"); + log.info("InternalPeriodicPullingProcessorRunnable::eventJsonString is {}", eventJsonString); + Map param = convertToProperties(eventJsonString); + String idString = param.get("push-change-update.subscription-id"); + SubscriptionInfo info = subscriptionInfoMap().get(idString); + if (info != null) { + SvcLogicContext ctx = setContext(param); + SvcLogicGraphInfo callbackDG = info.callBackDG(); + callbackDG.executeGraph(ctx); + } + } else { + log.info("InternalPeriodicPullingProcessorRunnable.run()::running has changed to false " + + "while eventQ was blocked to process new notifications"); + log.info("InternalPeriodicPullingProcessorRunnable.run()::" + + "the client is no longer interested to receive notifications."); + break; + } + } + } catch (InterruptedException | SvcLogicException e) { + e.printStackTrace(); + } + } + } + private SvcLogicContext setContext(Map param) { + SvcLogicContext ctx = new SvcLogicContext(); + for (Map.Entry entry : param.entrySet()) { + ctx.setAttribute(entry.getKey(), entry.getValue()); + } + return ctx; + } + } + + public String discoverRootResource(DeviceId device) { + return ROOT_RESOURCE; + } + + @Override + public void addNotificationListener(DeviceId deviceId, + RestconfNotificationEventListener listener) { + } + + @Override + public void removeNotificationListener(DeviceId deviceId, + RestconfNotificationEventListener listener) { + } + + public boolean isNotificationEnabled(DeviceId deviceId) { + return periodicRunnableTable.containsKey(deviceId); + } + +} diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java index e3b051265..10529b3ab 100644 --- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/RestconfDiscoveryNode.java @@ -561,6 +561,11 @@ public class RestconfDiscoveryNode implements RestConfSBController, SvcLogicDisc enableNotifications(dev.deviceId(), "yang-push-json", "json", myListener); } + @Override + public void establishPeriodicPullConnection(Map paramMap, SvcLogicContext ctx) throws SvcLogicException { + + } + @Override public void enableNotifications(DeviceId device, String request, String mediaType, diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java index 972fb2b55..d40d9a3f8 100644 --- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/SvcLogicDiscoveryPlugin.java @@ -123,6 +123,36 @@ public interface SvcLogicDiscoveryPlugin extends SvcLogicJavaPlugin { */ void establishPersistentSseConnection(Map paramMap, SvcLogicContext ctx) throws SvcLogicException; + /** + * Allows directed graphs to establish a periodic pull from a given controller. + * @param paramMap HashMap of parameters passed by the DG to this function + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
parameterMandatory/Optionaldescriptionexample values
templateDirNameOptionalfull path to YANG directory that can be used to build a request/sdncopt/bvc/resconfapi/test
periodicPullURLMandatoryurl to send periodically to the serverhttps://127.0.0.1:8181/restconf/data/ietf-service-pm:performance-monitoring
callbackDGMandatorycallback DG to process the received notificationResource-Discovery:handleSOTNTopology
filterURLOptionalurl which needs to be subscribed, if null subscribe to allhttp://example.com/sample-data/1.0
subscriptionTypeOptionaltype of subscription, periodic or onDataChangeonDataChange
updateFrequencyOptionalupdate frequency in milli seconds when subscription type is periodic1000
restapiUserOptionaluser name to use for http basic authenticationsdnc_ws
restapiPasswordOptionalunencrypted password to use for http basic authenticationplain_password
contentTypeOptionalhttp content type to set in the http headerusually application/json or application/xml
formatOptionalshould match request body formatjson or xml
responsePrefixOptionallocation the notification response will be written to in context memorytmp.restconfdiscovery.result
skipSendingOptionaltrue or false
convertResponse Optionalwhether the response should be convertedtrue or false
customHttpHeadersOptionala list additional http headers to be passed in, follow the format in the exampleX-CSI-MessageId=messageId,headerFieldName=headerFieldValue
dumpHeadersOptionalwhen true writes http header content to context memorytrue or false
+ * @param ctx Reference to context memory + * @throws SvcLogicException + * @since 11.0.2 + * @see String#split(String, int) + */ + void establishPeriodicPullConnection(Map paramMap, SvcLogicContext ctx) throws SvcLogicException; + /** * Allows directed graphs to modify a discovery subscription for a given subscriber. * @param paramMap HashMap of parameters passed by the DG to this function -- cgit 1.2.3-korg