diff options
-rw-r--r-- | plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java | 214 |
1 files changed, 176 insertions, 38 deletions
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java index 220de8790..909c6435e 100644 --- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java +++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java @@ -115,8 +115,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc } public void deactivate() { - log.info("RESTCONF SBI Stopped"); - executor.shutdown(); + log.info("PeriodicDiscoveryNode::deactivate: Going to shutdown the executors."); + if (executor != null) { + executor.shutdown(); + } + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + } this.getClientMap().clear(); this.getDeviceMap().clear(); } @@ -131,19 +136,19 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc @Override public Map<DeviceId, RestSBDevice> getDevices() { - log.trace("RESTCONF SBI::getDevices"); + log.info("RESTCONF SBI::getDevices"); return ImmutableMap.copyOf(deviceMap); } @Override public RestSBDevice getDevice(DeviceId deviceInfo) { - log.trace("RESTCONF SBI::getDevice with deviceId"); + log.info("RESTCONF SBI::getDevice with deviceId"); return deviceMap.get(deviceInfo); } @Override public RestSBDevice getDevice(String ip, int port) { - log.trace("RESTCONF SBI::getDevice with ip and port"); + log.info("RESTCONF SBI::getDevice with ip and port"); try { if (!deviceMap.isEmpty()) { return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get(); @@ -156,7 +161,7 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc @Override public void addDevice(RestSBDevice device) { - log.trace("RESTCONF SBI::addDevice"); + log.info("RESTCONF SBI::addDevice"); if (!deviceMap.containsKey(device.deviceId())) { if (device.username() != null) { String username = device.username(); @@ -168,9 +173,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc InternalPeriodicPullingProcessorRunnable eventProcessorRunnable = new InternalPeriodicPullingProcessorRunnable(device.deviceId()); processorRunnableTable.put(device.deviceId(), eventProcessorRunnable); - log.trace("addDevice::restconf event processor runnable is created and is going for execute"); + log.info("addDevice::restconf event processor runnable is created and is going for execute"); + if (executor.isShutdown()) { + log.info("PeriodicPulDiscoveryNode::addDevice - executor was shutdown. Restarting it."); + executor = Executors.newCachedThreadPool(); + } executor.execute(eventProcessorRunnable); - log.trace("addDevice::restconf event processor runnable was sent for execute"); + log.info("addDevice::restconf event processor runnable was sent for execute"); deviceMap.put(device.deviceId(), device); } else { log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId()); @@ -179,13 +188,19 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc @Override public void removeDevice(DeviceId deviceId) { - log.trace("RESTCONF SBI::removeDevice"); + log.info("RESTCONF SBI::removeDevice"); eventQMap.remove(deviceId); clientMap.remove(deviceId); deviceMap.remove(deviceId); } @Override + public void enableNotifications(DeviceId device, String request, String mediaType, + RestconfNotificationEventListener callBackListener) { + + } + + @Override public void establishSubscription(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException { String subscriberId = paramMap.get(SUBSCRIBER_ID); @@ -208,6 +223,12 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc @Override public void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException { + log.info("establishSubscriptionOnly::Necessary 55 sec. delay for the hardware to finish creating the resource"); + try { + Thread.sleep(55000); + } catch (InterruptedException e) { + e.printStackTrace(); + } String subscriberId = paramMap.get(SUBSCRIBER_ID); if (subscriberId == null) { throw new SvcLogicException("Subscriber Id is null"); @@ -239,9 +260,12 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc restapiCallNode.sendRequest(paramMap, ctx); if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) { // TODO: save subscription id and subscriber in MYSQL - String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx); +// String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx); +// log.info("establishSubscriptionOnly::Subscription is done successfully and " + +// "the output.identifier is: {}", id); + String id = dev.ip(); log.info("establishSubscriptionOnly::Subscription is done successfully and " + - "the output.identifier is: {}", id); + "the device ip is: {}", id); log.info("establishSubscriptionOnly::The subscriptionID returned by the server " + "does not exist in the map. Adding it now..."); subscribedDevicesTable.put(dev.deviceId(), id); @@ -268,9 +292,44 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc @Override public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) { - String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID)); - if (id != null) { - subscriptionInfoMap.remove(id); +// String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID)); +// if (id != null) { +// subscriptionInfoMap.remove(id); +// } + + String deleteSubscribeUrlString = paramMap.get(REST_API_URL); + URL deleteSubscribeUrl = null; + RestSBDevice dev = null; + try { + deleteSubscribeUrl = new URL(deleteSubscribeUrlString); + dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort()); + } catch (MalformedURLException e) { + log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e); + return; + } + + String deviceIp = deleteSubscribeUrl.getHost(); + String devicePort = String.valueOf(deleteSubscribeUrl.getPort()); + log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}", + deviceIp, devicePort); + if (dev == null) { + log.error("deleteSubscriptionAndSseConnection::device does not exist in the map"); + return; + } + String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId()); + + if (subscriptionId != null) { + log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId); + log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request"); + stopNotifications(dev.deviceId()); + subscribedDevicesTable.remove(dev.deviceId()); + +// String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID)); + if (subscriptionId != null) { + subscriptionInfoMap.remove(subscriptionId); + } + } else { + log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed"); } } @@ -460,13 +519,16 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc RestconfNotificationEventListenerImpl myListener = new RestconfNotificationEventListenerImpl(info); - enableNotifications(dev.deviceId(), "ietf-service-pm:performance-monitoring", "json", myListener); + enableNotifications(dev.deviceId(), + "ietf-service-pm:performance-monitoring/service-pm=" + paramMap.get("ethServiceName"), + "json", myListener, paramMap, ctx); +// enableNotifications(dev.deviceId(), periodicPullUrlString, +// "json", myListener, paramMap, ctx); } - @Override public void enableNotifications(DeviceId device, String request, String mediaType, - RestconfNotificationEventListener listener) { + RestconfNotificationEventListener listener, Map<String, String> paramMap, SvcLogicContext ctx) { if (isNotificationEnabled(device)) { log.warn("enableNotifications::already enabled on device: {}", device); return; @@ -477,9 +539,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc addNotificationListener(device, listener); - PeriodicPullRunnable periodicRunnable = new PeriodicPullRunnable(request, device); + PeriodicPullRunnable periodicRunnable = new PeriodicPullRunnable(request, device, paramMap, ctx); periodicRunnableTable.put(device, periodicRunnable); - scheduledExecutor.scheduleAtFixedRate(periodicRunnable, 0, 60, TimeUnit.SECONDS); + if (scheduledExecutor.isShutdown()) { + log.info("PeriodicPulDiscoveryNode::enableNotifications - scheduledExecutor was shutdown. Restarting it."); + scheduledExecutor = Executors.newScheduledThreadPool(2); + } + scheduledExecutor.scheduleAtFixedRate(periodicRunnable, 0, 90, TimeUnit.SECONDS); } public void stopNotifications(DeviceId device) { @@ -492,7 +558,12 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc log.info("stopNotifications::Runnable is now terminated"); periodicRunnableTable.remove(device); processorRunnableTable.remove(device); - log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString()); + if (periodicRunnableTable.isEmpty()) { + log.info("stopNotifications::periodicRunnableTable is empty. Going to shutdown the executors"); + this.deactivate(); + log.info("stopNotifications::Executors are now shutdown."); + } + log.info("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString()); } @Override @@ -540,6 +611,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc } catch (SvcLogicException e) { log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e); } + stopNotifications(dev.deviceId()); + subscribedDevicesTable.remove(dev.deviceId()); + + String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID)); + if (id != null) { + subscriptionInfoMap.remove(id); + } } else { log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed"); } @@ -548,6 +626,8 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc public class PeriodicPullRunnable implements Runnable { private String request; private DeviceId deviceId; + private Map<String, String> paramMap; + private SvcLogicContext ctx; private volatile boolean running = true; @@ -560,36 +640,60 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc /** * @param request request * @param deviceId device identifier + * @param paramMap + * @param ctx */ - public PeriodicPullRunnable(String request, DeviceId deviceId) { + public PeriodicPullRunnable(String request, DeviceId deviceId, Map<String, String> paramMap, SvcLogicContext ctx) { this.request = request; this.deviceId = deviceId; + this.paramMap = paramMap; + this.ctx = ctx; } @Override public void run() { - log.trace("PeriodicPullRunnable.run()::threadID is: {} ...., running is: {}", + Parameters p; + WebTarget target = null; + + log.info("PeriodicPullRunnable.run()::threadID is: {} ...., running is: {}", Thread.currentThread().getId(), running); try { +// Client client = ClientBuilder.newBuilder().build(); +// WebTarget target = client.target(getUrlString(deviceId, request)); + + + log.info("PeriodicPullRunnable::sending periodic GET pm-data request to hardware"); + RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode(); + p = RestapiCallNode.getParameters(paramMap, new Parameters()); + // Client client = ignoreSslClient(p.disableHostVerification).register(SseFeature.class); Client client = ClientBuilder.newBuilder().build(); - WebTarget target = client.target(getUrlString(deviceId, request)); - log.trace("PeriodicPullRunnable.run()::target URI is {}", target.getUri().toString()); + target = restapi.addAuthType(client, p).target(getUrlString(deviceId, request)); +// target = restapi.addAuthType(client, p).target(request); + + + log.info("PeriodicPullRunnable.run()::target URI is {}", target.getUri().toString()); Response response = null; if (running) { response = target.request().get(); String rcvdData = response.readEntity(String.class); - log.trace("PeriodicPullRunnable.run()::after readEntity"); - BlockingQueue<String> eventQ = getEventQ(deviceId); - if (eventQ != null) { - eventQ.add(rcvdData); - eventQMap.put(deviceId, eventQ); - log.trace("PeriodicPullRunnable.run()::eventQ got filled."); + if (response.getStatus() == 200) { + log.info("PeriodicPullRunnable.run()::after readEntity"); + BlockingQueue<String> eventQ = getEventQ(deviceId); + if (eventQ != null) { + eventQ.add(rcvdData); + eventQMap.put(deviceId, eventQ); + log.info("PeriodicPullRunnable.run()::eventQ got filled."); + } else { + log.error("PeriodicPullRunnable.run()::eventQ has not been initialized for this device {}", + deviceId); + } } else { - log.error("PeriodicPullRunnable.run()::eventQ has not been initialized for this device {}", - deviceId); + log.info("PeriodicPullRunnable.run():: GET pm-data did NOT return 200: {}", rcvdData); + log.info("PeriodicPullRunnable.run():: Status code is: {}", response.getStatus()); + log.info("PeriodicPullRunnable.run():: response is: {}", response.toString()); } } else { - log.trace("PeriodicPullRunnable.run()::running is false! " + + log.info("PeriodicPullRunnable.run()::running is false! " + "closing the client and the response, threadID: {}", Thread.currentThread().getId()); response.close(); client.close(); @@ -598,10 +702,40 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc } catch (Exception ex) { log.info("PeriodicPullRunnable.run()::We got some exception: {}, threadID: {} ", ex, Thread.currentThread().getId()); + executor.shutdown(); + scheduledExecutor.shutdown(); + log.info("PeriodicPullRunnable.run():: exceptions happened. So shutting down the executors"); } - log.trace("PeriodicPullRunnable.run()::after Runnable Try Catch. threadID: {} ", + log.info("PeriodicPullRunnable.run()::after Runnable Try Catch. threadID: {} ", Thread.currentThread().getId()); } + + private Client ignoreSslClient(boolean disableHostVerification) { + SSLContext sslcontext = null; + + try { + sslcontext = SSLContext.getInstance("TLS"); + sslcontext.init(null, new TrustManager[]{new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException { + } + + @Override + public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } }, new java.security.SecureRandom()); + } catch (NoSuchAlgorithmException | KeyManagementException e) { + throw new IllegalStateException(e); + } + + return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier(new AcceptIpAddressHostNameVerifier(disableHostVerification)).build(); + } + } public class InternalPeriodicPullingProcessorRunnable implements Runnable { @@ -621,22 +755,26 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc @Override public void run() { - log.trace("InternalPeriodicPullingProcessorRunnable::restconf event processor runnable inside run()"); + log.info("InternalPeriodicPullingProcessorRunnable::restconf event processor runnable inside run()"); while (running) { try { if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) { - log.trace("InternalPeriodicPullingProcessorRunnable::waiting for take()"); + log.info("InternalPeriodicPullingProcessorRunnable::waiting for take()"); if (running) { String eventJsonString = eventQMap.get(deviceId).take(); - log.trace("InternalPeriodicPullingProcessorRunnable::after take()"); + log.info("InternalPeriodicPullingProcessorRunnable::after take()"); log.info("InternalPeriodicPullingProcessorRunnable::eventJsonString is {}", eventJsonString); Map<String, String> param = convertToProperties(eventJsonString); - String idString = param.get("push-change-update.subscription-id"); +// String idString = param.get("push-change-update.subscription-id"); + String idString = getSubscriptionIdFromDeviceId(deviceId); + log.info("InternalPeriodicPullingProcessorRunnable::idString is {}", idString); SubscriptionInfo info = subscriptionInfoMap().get(idString); if (info != null) { + log.info("InternalPeriodicPullingProcessorRunnable::subscriptionInfo is not null; going to call the callback dg"); SvcLogicContext ctx = setContext(param); SvcLogicGraphInfo callbackDG = info.callBackDG(); callbackDG.executeGraph(ctx); + log.info("InternalPeriodicPullingProcessorRunnable::The callback dg is called"); } } else { log.info("InternalPeriodicPullingProcessorRunnable.run()::running has changed to false " + |