diff options
author | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2020-07-30 09:06:20 +0200 |
---|---|---|
committer | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2020-07-30 10:47:42 +0200 |
commit | 64d46cf2ef7e4441b46420bb6ca1c6863cc12ba6 (patch) | |
tree | ba58d70334fa96cce6b75cd2f1050d5c214532e4 /sdnr/wt/mountpoint-state-provider/provider/src/main/java | |
parent | 4aaaeb525cb04023147527da0164bbf7152fc9fd (diff) |
Update mountpoint-state-provider
Update mountpoint-state-provider for ODL Sodium support and some refactoring
Issue-ID: CCSDK-2575
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Change-Id: I5a44e8d2e2b4359d66374ccf7669fbbde45ee582
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-state-provider/provider/src/main/java')
-rw-r--r-- | sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java | 2 | ||||
-rw-r--r-- | sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java | 30 | ||||
-rw-r--r-- | sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java | 25 | ||||
-rw-r--r-- | sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java | 58 | ||||
-rw-r--r-- | sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java (renamed from sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java) | 85 |
5 files changed, 130 insertions, 70 deletions
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java index 675ac8a2f..21ca9dae7 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java @@ -38,7 +38,7 @@ public class GeneralConfig implements Configuration { private static final String DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE = "HTTPNOAUTH"; public static final String PROPERTY_KEY_PUBLISHER_HOST_PORT = "host"; - private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmap:3904"; + private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmaap:3904"; public static final String PROPERTY_KEY_PUBLISHER_TOPIC = "topic"; private static final String DEFAULT_VALUE_PUBLISHER_TOPIC = "unauthenticated.SDNR_MOUNTPOINT_STATE_INFO"; diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java index 48cb76ead..f9b7b1e6a 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java @@ -18,25 +18,38 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; +import java.util.Objects; import java.util.Optional; import org.eclipse.jdt.annotation.NonNull; import org.json.JSONObject; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; - +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectListener { +public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeConnectListenerImpl.class); + private NetconfNodeStateService netconfNodeStateService; + private MountpointStatePublisherMain mountpointStatePublisher; + private ListenerRegistration<MountpointNodeConnectListenerImpl> registeredNodeConnectListener; + + public MountpointNodeConnectListenerImpl(NetconfNodeStateService netconfNodeStateService) { + this.netconfNodeStateService = netconfNodeStateService; + } + + public void start(MountpointStatePublisherMain mountpointStatePublisher) { + this.mountpointStatePublisher = mountpointStatePublisher; + registeredNodeConnectListener = netconfNodeStateService.registerNetconfNodeConnectListener(this); + } @Override public void onEnterConnected(@NonNull NetconfAccessor accessor) { NodeId nNodeId = accessor.getNodeId(); NetconfNode netconfNode = accessor.getNetconfNode(); - //, MountPoint mountpoint, DataBroker netconfNodeDataBroker; LOG.debug("In onEnterConnected of MountpointNodeConnectListenerImpl - nNodeId = " + nNodeId.getValue() + " IP Address = " + netconfNode.getHost().getIpAddress().getIpv4Address().getValue()); @@ -46,7 +59,7 @@ public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectList obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - MountpointStatePublisher.stateObjects.add(obj); + mountpointStatePublisher.addToPublish(obj); } @Override @@ -59,13 +72,18 @@ public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectList obj.put("NetConfNodeState", "Unmounted"); obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - MountpointStatePublisher.stateObjects.add(obj); + mountpointStatePublisher.addToPublish(obj); + } + + public void stop() throws Exception { + this.close(); } @Override public void close() throws Exception { - LOG.debug("In close of MountpointNodeConnectListenerImpl"); + if (!Objects.isNull(registeredNodeConnectListener)) + registeredNodeConnectListener.close(); } } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java index 36ec16298..bbfd87961 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java @@ -20,14 +20,28 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; import org.json.JSONObject; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class); + private NetconfNodeStateService netconfNodeStateService; + private MountpointStatePublisherMain mountpointStatePublisher; + private ListenerRegistration<MountpointNodeStateListenerImpl> registeredNodeStateListener; + + public MountpointNodeStateListenerImpl(NetconfNodeStateService netconfNodeStateService) { + this.netconfNodeStateService = netconfNodeStateService; + } + + public void start(MountpointStatePublisherMain mountpointStatePublisher) { + this.mountpointStatePublisher = mountpointStatePublisher; + registeredNodeStateListener = netconfNodeStateService.registerNetconfNodeStateListener(this); + } @Override public void onCreated(NodeId nNodeId, NetconfNode netconfNode) { @@ -39,7 +53,7 @@ public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - MountpointStatePublisher.stateObjects.add(obj); + mountpointStatePublisher.addToPublish(obj); } @Override @@ -52,7 +66,7 @@ public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - MountpointStatePublisher.stateObjects.add(obj); + mountpointStatePublisher.addToPublish(obj); } @Override @@ -64,11 +78,16 @@ public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener obj.put("NetConfNodeState", "Removed"); obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - MountpointStatePublisher.stateObjects.add(obj); + mountpointStatePublisher.addToPublish(obj); + } + + public void stop() throws Exception { + this.close(); } @Override public void close() throws Exception { + registeredNodeStateListener.close(); } } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java index cb5cbe3e2..e31032393 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java @@ -32,7 +32,6 @@ import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateS import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings("deprecation") public class MountpointStateProviderImpl implements AutoCloseable, IConfigChangedListener { private static final Logger LOG = LoggerFactory.getLogger(MountpointStateProviderImpl.class); @@ -40,16 +39,17 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange private static final String CONFIGURATIONFILE = "etc/mountpoint-state-provider.properties"; private NetconfNodeStateService netconfNodeStateService; - private GeneralConfig generalConfig; private boolean dmaapEnabled = false; - private Thread mountpointStatePublisher = null; - MountpointNodeConnectListenerImpl nodeConnectListener = new MountpointNodeConnectListenerImpl(); - MountpointNodeStateListenerImpl nodeStateListener = new MountpointNodeStateListenerImpl(); + private MountpointNodeConnectListenerImpl nodeConnectListener; + private MountpointNodeStateListenerImpl nodeStateListener; + private MountpointStatePublisherMain mountpointStatePublisher; public MountpointStateProviderImpl() { LOG.info("Creating provider class for {}", APPLICATION_NAME); + nodeConnectListener = null; + nodeStateListener = null; } public void setNetconfNodeStateService(NetconfNodeStateService netconfNodeStateService) { @@ -62,12 +62,12 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange new ConfigurationFileRepresentation(CONFIGURATIONFILE); configFileRepresentation.registerConfigChangedListener(this); + nodeConnectListener = new MountpointNodeConnectListenerImpl(netconfNodeStateService); + nodeStateListener = new MountpointNodeStateListenerImpl(netconfNodeStateService); + generalConfig = new GeneralConfig(configFileRepresentation); if (generalConfig.getEnabled()) { //dmaapEnabled - mountpointStatePublisher = new Thread(new MountpointStatePublisher(generalConfig)); - mountpointStatePublisher.start(); - netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener); - netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener); + startPublishing(); } } @@ -85,35 +85,45 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled()); boolean dmaapEnabledNewVal = generalConfig.getEnabled(); - // DMaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s) + // DMaap disabled earlier (or during bundle startup) but enabled later, start publisher(s) if (!dmaapEnabled && dmaapEnabledNewVal) { LOG.info("DMaaP is enabled, starting Publisher"); - mountpointStatePublisher = new Thread(new MountpointStatePublisher(generalConfig)); - mountpointStatePublisher.start(); - netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener); - netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener); + startPublishing(); } else if (dmaapEnabled && !dmaapEnabledNewVal) { - // DMaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s) + // DMaap enabled earlier (or during bundle startup) but disabled later, stop publisher(s) LOG.info("DMaaP is disabled, stop publisher"); - try { - MountpointStatePublisher.stopPublisher(); - } catch (IOException | InterruptedException e) { - LOG.error("Exception while stopping publisher ", e); - } + stopPublishing(); } dmaapEnabled = dmaapEnabledNewVal; } + public void startPublishing() { + mountpointStatePublisher = new MountpointStatePublisherMain(generalConfig); + mountpointStatePublisher.start(); + + nodeConnectListener.start(mountpointStatePublisher); + nodeStateListener.start(mountpointStatePublisher); + } + + public void stopPublishing() { + try { + nodeConnectListener.stop(); + nodeStateListener.stop(); + mountpointStatePublisher.stop(); + } catch (Exception e) { + LOG.error("Exception while stopping publisher ", e); + } + } + @Override public void close() throws Exception { LOG.info("{} closing ...", this.getClass().getName()); - //close(updateService, configService, mwtnService); issue#1 try { - MountpointStatePublisher.stopPublisher(); + mountpointStatePublisher.stop(); } catch (IOException | InterruptedException e) { LOG.error("Exception while stopping publisher ", e); } - //close(updateService, mwtnService); + close(nodeConnectListener, nodeStateListener); LOG.info("{} closing done", APPLICATION_NAME); } @@ -123,7 +133,6 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange * @param toClose * @throws Exception */ - @SuppressWarnings("unused") private void close(AutoCloseable... toCloseList) throws Exception { for (AutoCloseable element : toCloseList) { if (element != null) { @@ -131,5 +140,4 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange } } } - } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java index 7f9fb2370..8d6e2d4a3 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java @@ -21,7 +21,6 @@ * ============LICENSE_END======================================================= * */ - package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; import java.io.IOException; @@ -29,7 +28,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; - import org.json.JSONObject; import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; import org.onap.dmaap.mr.client.MRBatchingPublisher; @@ -38,18 +36,17 @@ import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +public class MountpointStatePublisherMain { -public class MountpointStatePublisher implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class); - public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>(); - static MRBatchingPublisher pub; - Properties publisherProperties = new Properties(); - static boolean closePublisher = false; //Set this to true in the "Close" method of MountpointStateProviderImpl - private int fetchPause = 5000; // Default pause between fetch - 5 seconds + private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisherMain.class); + private Thread thread = null; + private MRBatchingPublisher pub = null; + private List<JSONObject> stateObjects = new LinkedList<JSONObject>(); + private Properties publisherProperties = new Properties(); + private boolean closePublisher = false; + private int publishPause = 5000; // Default pause between fetch - 5 seconds - - public MountpointStatePublisher(Configuration config) { + public MountpointStatePublisherMain(Configuration config) { initialize(config); } @@ -76,7 +73,6 @@ public class MountpointStatePublisher implements Runnable { return pub; } catch (IOException e) { LOG.info("Exception while creating a publisher", e); - } return null; } @@ -94,36 +90,55 @@ public class MountpointStatePublisher implements Runnable { return pub; } - public void run() { - - while (!closePublisher) { - try { - if (stateObjects.size() > 0) { - JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst(); - publishMessage(getPublisher(), obj.toString()); - } else { - pauseThread(); - } - } catch (Exception ex) { - LOG.error("Exception while publishing message, ignoring and continuing ... ", ex); - } + public void start() { + thread = new Thread(new MountpointStatePublisher()); + thread.start(); + } - MRPublisherResponse res = pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called - LOG.debug("Response message = {} ", res.toString()); - } + public void stop() throws IOException, InterruptedException { + closePublisher = true; + getPublisher().close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close) } private void pauseThread() throws InterruptedException { - if (fetchPause > 0) { - LOG.debug("No data yet to publish. Pausing {} ms before retry ", fetchPause); - Thread.sleep(fetchPause); + if (publishPause > 0) { + LOG.debug("No data yet to publish. Pausing {} ms before retry ", publishPause); + Thread.sleep(publishPause); } else { LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately"); } } - public static void stopPublisher() throws IOException, InterruptedException { - closePublisher = true; - pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close + public void addToPublish(JSONObject publishObj) { + getStateObjects().add(publishObj); } + + public List<JSONObject> getStateObjects() { + return stateObjects; + } + + public class MountpointStatePublisher implements Runnable { + + @Override + public void run() { + while (!closePublisher) { + try { + if (getStateObjects().size() > 0) { + JSONObject obj = ((LinkedList<JSONObject>) getStateObjects()).removeFirst(); + publishMessage(getPublisher(), obj.toString()); + } else { + pauseThread(); + } + } catch (Exception ex) { + LOG.error("Exception while publishing message, ignoring and continuing ... ", ex); + } + + MRPublisherResponse res = pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called + LOG.debug("Response message = {} ", res.toString()); + } + + } + + } + } |