summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorHesam Rahimi <hesam.rahimi@huawei.com>2022-10-28 18:18:52 -0400
committerHesam Rahimi <hesam.rahimi@huawei.com>2022-10-28 22:26:25 +0000
commitdd1d836248e804a9331ed7a807bb8acecdecf66d (patch)
tree855b0c28f9fd3b76a4eb73db857ba79e7ba46142 /plugins
parentb28d2307b3dad32076b1842025d643859c2297e1 (diff)
Bug fixing for the periodic pulling of Performance Monitoring (PM) data from 3rd party controllers.
Issue-ID: CCSDK-3752 Signed-off-by: Hesam Rahimi <hesam.rahimi@huawei.com> Change-Id: Ie5a0b65686a101e7416608fc10135d25a974aedd
Diffstat (limited to 'plugins')
-rw-r--r--plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java214
1 files changed, 176 insertions, 38 deletions
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java
index 220de8790..909c6435e 100644
--- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/PeriodicDiscoveryNode.java
@@ -115,8 +115,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
}
public void deactivate() {
- log.info("RESTCONF SBI Stopped");
- executor.shutdown();
+ log.info("PeriodicDiscoveryNode::deactivate: Going to shutdown the executors.");
+ if (executor != null) {
+ executor.shutdown();
+ }
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdown();
+ }
this.getClientMap().clear();
this.getDeviceMap().clear();
}
@@ -131,19 +136,19 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
@Override
public Map<DeviceId, RestSBDevice> getDevices() {
- log.trace("RESTCONF SBI::getDevices");
+ log.info("RESTCONF SBI::getDevices");
return ImmutableMap.copyOf(deviceMap);
}
@Override
public RestSBDevice getDevice(DeviceId deviceInfo) {
- log.trace("RESTCONF SBI::getDevice with deviceId");
+ log.info("RESTCONF SBI::getDevice with deviceId");
return deviceMap.get(deviceInfo);
}
@Override
public RestSBDevice getDevice(String ip, int port) {
- log.trace("RESTCONF SBI::getDevice with ip and port");
+ log.info("RESTCONF SBI::getDevice with ip and port");
try {
if (!deviceMap.isEmpty()) {
return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get();
@@ -156,7 +161,7 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
@Override
public void addDevice(RestSBDevice device) {
- log.trace("RESTCONF SBI::addDevice");
+ log.info("RESTCONF SBI::addDevice");
if (!deviceMap.containsKey(device.deviceId())) {
if (device.username() != null) {
String username = device.username();
@@ -168,9 +173,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
InternalPeriodicPullingProcessorRunnable eventProcessorRunnable =
new InternalPeriodicPullingProcessorRunnable(device.deviceId());
processorRunnableTable.put(device.deviceId(), eventProcessorRunnable);
- log.trace("addDevice::restconf event processor runnable is created and is going for execute");
+ log.info("addDevice::restconf event processor runnable is created and is going for execute");
+ if (executor.isShutdown()) {
+ log.info("PeriodicPulDiscoveryNode::addDevice - executor was shutdown. Restarting it.");
+ executor = Executors.newCachedThreadPool();
+ }
executor.execute(eventProcessorRunnable);
- log.trace("addDevice::restconf event processor runnable was sent for execute");
+ log.info("addDevice::restconf event processor runnable was sent for execute");
deviceMap.put(device.deviceId(), device);
} else {
log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId());
@@ -179,13 +188,19 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
@Override
public void removeDevice(DeviceId deviceId) {
- log.trace("RESTCONF SBI::removeDevice");
+ log.info("RESTCONF SBI::removeDevice");
eventQMap.remove(deviceId);
clientMap.remove(deviceId);
deviceMap.remove(deviceId);
}
@Override
+ public void enableNotifications(DeviceId device, String request, String mediaType,
+ RestconfNotificationEventListener callBackListener) {
+
+ }
+
+ @Override
public void establishSubscription(Map<String, String> paramMap,
SvcLogicContext ctx) throws SvcLogicException {
String subscriberId = paramMap.get(SUBSCRIBER_ID);
@@ -208,6 +223,12 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
@Override
public void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx)
throws SvcLogicException {
+ log.info("establishSubscriptionOnly::Necessary 55 sec. delay for the hardware to finish creating the resource");
+ try {
+ Thread.sleep(55000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
String subscriberId = paramMap.get(SUBSCRIBER_ID);
if (subscriberId == null) {
throw new SvcLogicException("Subscriber Id is null");
@@ -239,9 +260,12 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
restapiCallNode.sendRequest(paramMap, ctx);
if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
// TODO: save subscription id and subscriber in MYSQL
- String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx);
+// String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx);
+// log.info("establishSubscriptionOnly::Subscription is done successfully and " +
+// "the output.identifier is: {}", id);
+ String id = dev.ip();
log.info("establishSubscriptionOnly::Subscription is done successfully and " +
- "the output.identifier is: {}", id);
+ "the device ip is: {}", id);
log.info("establishSubscriptionOnly::The subscriptionID returned by the server " +
"does not exist in the map. Adding it now...");
subscribedDevicesTable.put(dev.deviceId(), id);
@@ -268,9 +292,44 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
@Override
public void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
- String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
- if (id != null) {
- subscriptionInfoMap.remove(id);
+// String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
+// if (id != null) {
+// subscriptionInfoMap.remove(id);
+// }
+
+ String deleteSubscribeUrlString = paramMap.get(REST_API_URL);
+ URL deleteSubscribeUrl = null;
+ RestSBDevice dev = null;
+ try {
+ deleteSubscribeUrl = new URL(deleteSubscribeUrlString);
+ dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort());
+ } catch (MalformedURLException e) {
+ log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
+ return;
+ }
+
+ String deviceIp = deleteSubscribeUrl.getHost();
+ String devicePort = String.valueOf(deleteSubscribeUrl.getPort());
+ log.info("deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}",
+ deviceIp, devicePort);
+ if (dev == null) {
+ log.error("deleteSubscriptionAndSseConnection::device does not exist in the map");
+ return;
+ }
+ String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId());
+
+ if (subscriptionId != null) {
+ log.info("deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId);
+ log.info("deleteSubscriptionAndSseConnection::About to send unsubscribe request");
+ stopNotifications(dev.deviceId());
+ subscribedDevicesTable.remove(dev.deviceId());
+
+// String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
+ if (subscriptionId != null) {
+ subscriptionInfoMap.remove(subscriptionId);
+ }
+ } else {
+ log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed");
}
}
@@ -460,13 +519,16 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
RestconfNotificationEventListenerImpl myListener =
new RestconfNotificationEventListenerImpl(info);
- enableNotifications(dev.deviceId(), "ietf-service-pm:performance-monitoring", "json", myListener);
+ enableNotifications(dev.deviceId(),
+ "ietf-service-pm:performance-monitoring/service-pm=" + paramMap.get("ethServiceName"),
+ "json", myListener, paramMap, ctx);
+// enableNotifications(dev.deviceId(), periodicPullUrlString,
+// "json", myListener, paramMap, ctx);
}
- @Override
public void enableNotifications(DeviceId device, String request,
String mediaType,
- RestconfNotificationEventListener listener) {
+ RestconfNotificationEventListener listener, Map<String, String> paramMap, SvcLogicContext ctx) {
if (isNotificationEnabled(device)) {
log.warn("enableNotifications::already enabled on device: {}", device);
return;
@@ -477,9 +539,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
addNotificationListener(device, listener);
- PeriodicPullRunnable periodicRunnable = new PeriodicPullRunnable(request, device);
+ PeriodicPullRunnable periodicRunnable = new PeriodicPullRunnable(request, device, paramMap, ctx);
periodicRunnableTable.put(device, periodicRunnable);
- scheduledExecutor.scheduleAtFixedRate(periodicRunnable, 0, 60, TimeUnit.SECONDS);
+ if (scheduledExecutor.isShutdown()) {
+ log.info("PeriodicPulDiscoveryNode::enableNotifications - scheduledExecutor was shutdown. Restarting it.");
+ scheduledExecutor = Executors.newScheduledThreadPool(2);
+ }
+ scheduledExecutor.scheduleAtFixedRate(periodicRunnable, 0, 90, TimeUnit.SECONDS);
}
public void stopNotifications(DeviceId device) {
@@ -492,7 +558,12 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
log.info("stopNotifications::Runnable is now terminated");
periodicRunnableTable.remove(device);
processorRunnableTable.remove(device);
- log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString());
+ if (periodicRunnableTable.isEmpty()) {
+ log.info("stopNotifications::periodicRunnableTable is empty. Going to shutdown the executors");
+ this.deactivate();
+ log.info("stopNotifications::Executors are now shutdown.");
+ }
+ log.info("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString());
}
@Override
@@ -540,6 +611,13 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
} catch (SvcLogicException e) {
log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e);
}
+ stopNotifications(dev.deviceId());
+ subscribedDevicesTable.remove(dev.deviceId());
+
+ String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
+ if (id != null) {
+ subscriptionInfoMap.remove(id);
+ }
} else {
log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed");
}
@@ -548,6 +626,8 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
public class PeriodicPullRunnable implements Runnable {
private String request;
private DeviceId deviceId;
+ private Map<String, String> paramMap;
+ private SvcLogicContext ctx;
private volatile boolean running = true;
@@ -560,36 +640,60 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
/**
* @param request request
* @param deviceId device identifier
+ * @param paramMap
+ * @param ctx
*/
- public PeriodicPullRunnable(String request, DeviceId deviceId) {
+ public PeriodicPullRunnable(String request, DeviceId deviceId, Map<String, String> paramMap, SvcLogicContext ctx) {
this.request = request;
this.deviceId = deviceId;
+ this.paramMap = paramMap;
+ this.ctx = ctx;
}
@Override
public void run() {
- log.trace("PeriodicPullRunnable.run()::threadID is: {} ...., running is: {}",
+ Parameters p;
+ WebTarget target = null;
+
+ log.info("PeriodicPullRunnable.run()::threadID is: {} ...., running is: {}",
Thread.currentThread().getId(), running);
try {
+// Client client = ClientBuilder.newBuilder().build();
+// WebTarget target = client.target(getUrlString(deviceId, request));
+
+
+ log.info("PeriodicPullRunnable::sending periodic GET pm-data request to hardware");
+ RestapiCallNode restapi = restconfApiCallNode.getRestapiCallNode();
+ p = RestapiCallNode.getParameters(paramMap, new Parameters());
+ // Client client = ignoreSslClient(p.disableHostVerification).register(SseFeature.class);
Client client = ClientBuilder.newBuilder().build();
- WebTarget target = client.target(getUrlString(deviceId, request));
- log.trace("PeriodicPullRunnable.run()::target URI is {}", target.getUri().toString());
+ target = restapi.addAuthType(client, p).target(getUrlString(deviceId, request));
+// target = restapi.addAuthType(client, p).target(request);
+
+
+ log.info("PeriodicPullRunnable.run()::target URI is {}", target.getUri().toString());
Response response = null;
if (running) {
response = target.request().get();
String rcvdData = response.readEntity(String.class);
- log.trace("PeriodicPullRunnable.run()::after readEntity");
- BlockingQueue<String> eventQ = getEventQ(deviceId);
- if (eventQ != null) {
- eventQ.add(rcvdData);
- eventQMap.put(deviceId, eventQ);
- log.trace("PeriodicPullRunnable.run()::eventQ got filled.");
+ if (response.getStatus() == 200) {
+ log.info("PeriodicPullRunnable.run()::after readEntity");
+ BlockingQueue<String> eventQ = getEventQ(deviceId);
+ if (eventQ != null) {
+ eventQ.add(rcvdData);
+ eventQMap.put(deviceId, eventQ);
+ log.info("PeriodicPullRunnable.run()::eventQ got filled.");
+ } else {
+ log.error("PeriodicPullRunnable.run()::eventQ has not been initialized for this device {}",
+ deviceId);
+ }
} else {
- log.error("PeriodicPullRunnable.run()::eventQ has not been initialized for this device {}",
- deviceId);
+ log.info("PeriodicPullRunnable.run():: GET pm-data did NOT return 200: {}", rcvdData);
+ log.info("PeriodicPullRunnable.run():: Status code is: {}", response.getStatus());
+ log.info("PeriodicPullRunnable.run():: response is: {}", response.toString());
}
} else {
- log.trace("PeriodicPullRunnable.run()::running is false! " +
+ log.info("PeriodicPullRunnable.run()::running is false! " +
"closing the client and the response, threadID: {}", Thread.currentThread().getId());
response.close();
client.close();
@@ -598,10 +702,40 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
} catch (Exception ex) {
log.info("PeriodicPullRunnable.run()::We got some exception: {}, threadID: {} ", ex,
Thread.currentThread().getId());
+ executor.shutdown();
+ scheduledExecutor.shutdown();
+ log.info("PeriodicPullRunnable.run():: exceptions happened. So shutting down the executors");
}
- log.trace("PeriodicPullRunnable.run()::after Runnable Try Catch. threadID: {} ",
+ log.info("PeriodicPullRunnable.run()::after Runnable Try Catch. threadID: {} ",
Thread.currentThread().getId());
}
+
+ private Client ignoreSslClient(boolean disableHostVerification) {
+ SSLContext sslcontext = null;
+
+ try {
+ sslcontext = SSLContext.getInstance("TLS");
+ sslcontext.init(null, new TrustManager[]{new X509TrustManager() {
+ @Override
+ public void checkClientTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ } }, new java.security.SecureRandom());
+ } catch (NoSuchAlgorithmException | KeyManagementException e) {
+ throw new IllegalStateException(e);
+ }
+
+ return ClientBuilder.newBuilder().sslContext(sslcontext).hostnameVerifier(new AcceptIpAddressHostNameVerifier(disableHostVerification)).build();
+ }
+
}
public class InternalPeriodicPullingProcessorRunnable implements Runnable {
@@ -621,22 +755,26 @@ public class PeriodicDiscoveryNode implements RestConfSBController, SvcLogicDisc
@Override
public void run() {
- log.trace("InternalPeriodicPullingProcessorRunnable::restconf event processor runnable inside run()");
+ log.info("InternalPeriodicPullingProcessorRunnable::restconf event processor runnable inside run()");
while (running) {
try {
if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) {
- log.trace("InternalPeriodicPullingProcessorRunnable::waiting for take()");
+ log.info("InternalPeriodicPullingProcessorRunnable::waiting for take()");
if (running) {
String eventJsonString = eventQMap.get(deviceId).take();
- log.trace("InternalPeriodicPullingProcessorRunnable::after take()");
+ log.info("InternalPeriodicPullingProcessorRunnable::after take()");
log.info("InternalPeriodicPullingProcessorRunnable::eventJsonString is {}", eventJsonString);
Map<String, String> param = convertToProperties(eventJsonString);
- String idString = param.get("push-change-update.subscription-id");
+// String idString = param.get("push-change-update.subscription-id");
+ String idString = getSubscriptionIdFromDeviceId(deviceId);
+ log.info("InternalPeriodicPullingProcessorRunnable::idString is {}", idString);
SubscriptionInfo info = subscriptionInfoMap().get(idString);
if (info != null) {
+ log.info("InternalPeriodicPullingProcessorRunnable::subscriptionInfo is not null; going to call the callback dg");
SvcLogicContext ctx = setContext(param);
SvcLogicGraphInfo callbackDG = info.callBackDG();
callbackDG.executeGraph(ctx);
+ log.info("InternalPeriodicPullingProcessorRunnable::The callback dg is called");
}
} else {
log.info("InternalPeriodicPullingProcessorRunnable.run()::running has changed to false " +