summaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-state-provider/provider/src/main
diff options
context:
space:
mode:
authorRavi Pendurty <ravi.pendurty@highstreet-technologies.com>2020-07-30 09:06:20 +0200
committerRavi Pendurty <ravi.pendurty@highstreet-technologies.com>2020-07-30 10:47:42 +0200
commit64d46cf2ef7e4441b46420bb6ca1c6863cc12ba6 (patch)
treeba58d70334fa96cce6b75cd2f1050d5c214532e4 /sdnr/wt/mountpoint-state-provider/provider/src/main
parent4aaaeb525cb04023147527da0164bbf7152fc9fd (diff)
Update mountpoint-state-provider
Update mountpoint-state-provider for ODL Sodium support and some refactoring Issue-ID: CCSDK-2575 Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> Change-Id: I5a44e8d2e2b4359d66374ccf7669fbbde45ee582 Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-state-provider/provider/src/main')
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java2
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java30
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java25
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java58
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java (renamed from sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java)85
5 files changed, 130 insertions, 70 deletions
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java
index 675ac8a2f..21ca9dae7 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java
@@ -38,7 +38,7 @@ public class GeneralConfig implements Configuration {
private static final String DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE = "HTTPNOAUTH";
public static final String PROPERTY_KEY_PUBLISHER_HOST_PORT = "host";
- private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmap:3904";
+ private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmaap:3904";
public static final String PROPERTY_KEY_PUBLISHER_TOPIC = "topic";
private static final String DEFAULT_VALUE_PUBLISHER_TOPIC = "unauthenticated.SDNR_MOUNTPOINT_STATE_INFO";
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
index 48cb76ead..f9b7b1e6a 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
@@ -18,25 +18,38 @@
package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+import java.util.Objects;
import java.util.Optional;
import org.eclipse.jdt.annotation.NonNull;
import org.json.JSONObject;
import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor;
import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectListener {
+public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeConnectListenerImpl.class);
+ private NetconfNodeStateService netconfNodeStateService;
+ private MountpointStatePublisherMain mountpointStatePublisher;
+ private ListenerRegistration<MountpointNodeConnectListenerImpl> registeredNodeConnectListener;
+
+ public MountpointNodeConnectListenerImpl(NetconfNodeStateService netconfNodeStateService) {
+ this.netconfNodeStateService = netconfNodeStateService;
+ }
+
+ public void start(MountpointStatePublisherMain mountpointStatePublisher) {
+ this.mountpointStatePublisher = mountpointStatePublisher;
+ registeredNodeConnectListener = netconfNodeStateService.registerNetconfNodeConnectListener(this);
+ }
@Override
public void onEnterConnected(@NonNull NetconfAccessor accessor) {
NodeId nNodeId = accessor.getNodeId();
NetconfNode netconfNode = accessor.getNetconfNode();
- //, MountPoint mountpoint, DataBroker netconfNodeDataBroker;
LOG.debug("In onEnterConnected of MountpointNodeConnectListenerImpl - nNodeId = " + nNodeId.getValue()
+ " IP Address = " + netconfNode.getHost().getIpAddress().getIpv4Address().getValue());
@@ -46,7 +59,7 @@ public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectList
obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString());
obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
- MountpointStatePublisher.stateObjects.add(obj);
+ mountpointStatePublisher.addToPublish(obj);
}
@Override
@@ -59,13 +72,18 @@ public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectList
obj.put("NetConfNodeState", "Unmounted");
obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
- MountpointStatePublisher.stateObjects.add(obj);
+ mountpointStatePublisher.addToPublish(obj);
+ }
+
+ public void stop() throws Exception {
+ this.close();
}
@Override
public void close() throws Exception {
-
LOG.debug("In close of MountpointNodeConnectListenerImpl");
+ if (!Objects.isNull(registeredNodeConnectListener))
+ registeredNodeConnectListener.close();
}
}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
index 36ec16298..bbfd87961 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
@@ -20,14 +20,28 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
import org.json.JSONObject;
import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class);
+ private NetconfNodeStateService netconfNodeStateService;
+ private MountpointStatePublisherMain mountpointStatePublisher;
+ private ListenerRegistration<MountpointNodeStateListenerImpl> registeredNodeStateListener;
+
+ public MountpointNodeStateListenerImpl(NetconfNodeStateService netconfNodeStateService) {
+ this.netconfNodeStateService = netconfNodeStateService;
+ }
+
+ public void start(MountpointStatePublisherMain mountpointStatePublisher) {
+ this.mountpointStatePublisher = mountpointStatePublisher;
+ registeredNodeStateListener = netconfNodeStateService.registerNetconfNodeStateListener(this);
+ }
@Override
public void onCreated(NodeId nNodeId, NetconfNode netconfNode) {
@@ -39,7 +53,7 @@ public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener
obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString());
obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
- MountpointStatePublisher.stateObjects.add(obj);
+ mountpointStatePublisher.addToPublish(obj);
}
@Override
@@ -52,7 +66,7 @@ public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener
obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString());
obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
- MountpointStatePublisher.stateObjects.add(obj);
+ mountpointStatePublisher.addToPublish(obj);
}
@Override
@@ -64,11 +78,16 @@ public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener
obj.put("NetConfNodeState", "Removed");
obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
- MountpointStatePublisher.stateObjects.add(obj);
+ mountpointStatePublisher.addToPublish(obj);
+ }
+
+ public void stop() throws Exception {
+ this.close();
}
@Override
public void close() throws Exception {
+ registeredNodeStateListener.close();
}
}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
index cb5cbe3e2..e31032393 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
@@ -32,7 +32,6 @@ import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateS
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@SuppressWarnings("deprecation")
public class MountpointStateProviderImpl implements AutoCloseable, IConfigChangedListener {
private static final Logger LOG = LoggerFactory.getLogger(MountpointStateProviderImpl.class);
@@ -40,16 +39,17 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange
private static final String CONFIGURATIONFILE = "etc/mountpoint-state-provider.properties";
private NetconfNodeStateService netconfNodeStateService;
-
private GeneralConfig generalConfig;
private boolean dmaapEnabled = false;
- private Thread mountpointStatePublisher = null;
- MountpointNodeConnectListenerImpl nodeConnectListener = new MountpointNodeConnectListenerImpl();
- MountpointNodeStateListenerImpl nodeStateListener = new MountpointNodeStateListenerImpl();
+ private MountpointNodeConnectListenerImpl nodeConnectListener;
+ private MountpointNodeStateListenerImpl nodeStateListener;
+ private MountpointStatePublisherMain mountpointStatePublisher;
public MountpointStateProviderImpl() {
LOG.info("Creating provider class for {}", APPLICATION_NAME);
+ nodeConnectListener = null;
+ nodeStateListener = null;
}
public void setNetconfNodeStateService(NetconfNodeStateService netconfNodeStateService) {
@@ -62,12 +62,12 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange
new ConfigurationFileRepresentation(CONFIGURATIONFILE);
configFileRepresentation.registerConfigChangedListener(this);
+ nodeConnectListener = new MountpointNodeConnectListenerImpl(netconfNodeStateService);
+ nodeStateListener = new MountpointNodeStateListenerImpl(netconfNodeStateService);
+
generalConfig = new GeneralConfig(configFileRepresentation);
if (generalConfig.getEnabled()) { //dmaapEnabled
- mountpointStatePublisher = new Thread(new MountpointStatePublisher(generalConfig));
- mountpointStatePublisher.start();
- netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener);
- netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener);
+ startPublishing();
}
}
@@ -85,35 +85,45 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange
LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled());
boolean dmaapEnabledNewVal = generalConfig.getEnabled();
- // DMaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
+ // DMaap disabled earlier (or during bundle startup) but enabled later, start publisher(s)
if (!dmaapEnabled && dmaapEnabledNewVal) {
LOG.info("DMaaP is enabled, starting Publisher");
- mountpointStatePublisher = new Thread(new MountpointStatePublisher(generalConfig));
- mountpointStatePublisher.start();
- netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener);
- netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener);
+ startPublishing();
} else if (dmaapEnabled && !dmaapEnabledNewVal) {
- // DMaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
+ // DMaap enabled earlier (or during bundle startup) but disabled later, stop publisher(s)
LOG.info("DMaaP is disabled, stop publisher");
- try {
- MountpointStatePublisher.stopPublisher();
- } catch (IOException | InterruptedException e) {
- LOG.error("Exception while stopping publisher ", e);
- }
+ stopPublishing();
}
dmaapEnabled = dmaapEnabledNewVal;
}
+ public void startPublishing() {
+ mountpointStatePublisher = new MountpointStatePublisherMain(generalConfig);
+ mountpointStatePublisher.start();
+
+ nodeConnectListener.start(mountpointStatePublisher);
+ nodeStateListener.start(mountpointStatePublisher);
+ }
+
+ public void stopPublishing() {
+ try {
+ nodeConnectListener.stop();
+ nodeStateListener.stop();
+ mountpointStatePublisher.stop();
+ } catch (Exception e) {
+ LOG.error("Exception while stopping publisher ", e);
+ }
+ }
+
@Override
public void close() throws Exception {
LOG.info("{} closing ...", this.getClass().getName());
- //close(updateService, configService, mwtnService); issue#1
try {
- MountpointStatePublisher.stopPublisher();
+ mountpointStatePublisher.stop();
} catch (IOException | InterruptedException e) {
LOG.error("Exception while stopping publisher ", e);
}
- //close(updateService, mwtnService);
+ close(nodeConnectListener, nodeStateListener);
LOG.info("{} closing done", APPLICATION_NAME);
}
@@ -123,7 +133,6 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange
* @param toClose
* @throws Exception
*/
- @SuppressWarnings("unused")
private void close(AutoCloseable... toCloseList) throws Exception {
for (AutoCloseable element : toCloseList) {
if (element != null) {
@@ -131,5 +140,4 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange
}
}
}
-
}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java
index 7f9fb2370..8d6e2d4a3 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java
@@ -21,7 +21,6 @@
* ============LICENSE_END=======================================================
*
*/
-
package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
import java.io.IOException;
@@ -29,7 +28,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-
import org.json.JSONObject;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
import org.onap.dmaap.mr.client.MRBatchingPublisher;
@@ -38,18 +36,17 @@ import org.onap.dmaap.mr.client.response.MRPublisherResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+public class MountpointStatePublisherMain {
-public class MountpointStatePublisher implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class);
- public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>();
- static MRBatchingPublisher pub;
- Properties publisherProperties = new Properties();
- static boolean closePublisher = false; //Set this to true in the "Close" method of MountpointStateProviderImpl
- private int fetchPause = 5000; // Default pause between fetch - 5 seconds
+ private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisherMain.class);
+ private Thread thread = null;
+ private MRBatchingPublisher pub = null;
+ private List<JSONObject> stateObjects = new LinkedList<JSONObject>();
+ private Properties publisherProperties = new Properties();
+ private boolean closePublisher = false;
+ private int publishPause = 5000; // Default pause between fetch - 5 seconds
-
- public MountpointStatePublisher(Configuration config) {
+ public MountpointStatePublisherMain(Configuration config) {
initialize(config);
}
@@ -76,7 +73,6 @@ public class MountpointStatePublisher implements Runnable {
return pub;
} catch (IOException e) {
LOG.info("Exception while creating a publisher", e);
-
}
return null;
}
@@ -94,36 +90,55 @@ public class MountpointStatePublisher implements Runnable {
return pub;
}
- public void run() {
-
- while (!closePublisher) {
- try {
- if (stateObjects.size() > 0) {
- JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst();
- publishMessage(getPublisher(), obj.toString());
- } else {
- pauseThread();
- }
- } catch (Exception ex) {
- LOG.error("Exception while publishing message, ignoring and continuing ... ", ex);
- }
+ public void start() {
+ thread = new Thread(new MountpointStatePublisher());
+ thread.start();
+ }
- MRPublisherResponse res = pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called
- LOG.debug("Response message = {} ", res.toString());
- }
+ public void stop() throws IOException, InterruptedException {
+ closePublisher = true;
+ getPublisher().close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close)
}
private void pauseThread() throws InterruptedException {
- if (fetchPause > 0) {
- LOG.debug("No data yet to publish. Pausing {} ms before retry ", fetchPause);
- Thread.sleep(fetchPause);
+ if (publishPause > 0) {
+ LOG.debug("No data yet to publish. Pausing {} ms before retry ", publishPause);
+ Thread.sleep(publishPause);
} else {
LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately");
}
}
- public static void stopPublisher() throws IOException, InterruptedException {
- closePublisher = true;
- pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close
+ public void addToPublish(JSONObject publishObj) {
+ getStateObjects().add(publishObj);
}
+
+ public List<JSONObject> getStateObjects() {
+ return stateObjects;
+ }
+
+ public class MountpointStatePublisher implements Runnable {
+
+ @Override
+ public void run() {
+ while (!closePublisher) {
+ try {
+ if (getStateObjects().size() > 0) {
+ JSONObject obj = ((LinkedList<JSONObject>) getStateObjects()).removeFirst();
+ publishMessage(getPublisher(), obj.toString());
+ } else {
+ pauseThread();
+ }
+ } catch (Exception ex) {
+ LOG.error("Exception while publishing message, ignoring and continuing ... ", ex);
+ }
+
+ MRPublisherResponse res = pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called
+ LOG.debug("Response message = {} ", res.toString());
+ }
+
+ }
+
+ }
+
}