aboutsummaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-state-provider/provider/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'sdnr/wt/mountpoint-state-provider/provider/src/main')
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java176
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java10
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java65
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java24
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java169
5 files changed, 229 insertions, 215 deletions
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java
index 830346027..675ac8a2f 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
@@ -22,111 +22,121 @@ import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRep
/**
* Configuration of mountpoint-state-provider, general section<br>
- * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not
- * Generates default Configuration properties if none exist or exist partially
- * Generates Publisher properties only for TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not
- * generated by default. For a list of applicable properties for the different TranportType values, please see - https://wiki.onap.org/display/DW/Feature+configuration+requirements
+ * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not Generates default
+ * Configuration properties if none exist or exist partially Generates Publisher properties only for
+ * TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not
+ * generated by default. For a list of applicable properties for the different TranportType values, please see -
+ * https://wiki.onap.org/display/DW/Feature+configuration+requirements
*/
public class GeneralConfig implements Configuration {
private static final String SECTION_MARKER = "general";
- private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled" ; //"disabled";
-
+ private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled"; //"disabled";
+
public static final String PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE = "TransportType";
private static final String DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE = "HTTPNOAUTH";
-
+
public static final String PROPERTY_KEY_PUBLISHER_HOST_PORT = "host";
private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmap:3904";
-
+
public static final String PROPERTY_KEY_PUBLISHER_TOPIC = "topic";
private static final String DEFAULT_VALUE_PUBLISHER_TOPIC = "unauthenticated.SDNR_MOUNTPOINT_STATE_INFO";
-
+
public static final String PROPERTY_KEY_PUBLISHER_CONTENTTYPE = "contenttype";
private static final String DEFAULT_VALUE_PUBLISHER_CONTENTTYPE = "application/json";
-
+
public static final String PROPERTY_KEY_PUBLISHER_TIMEOUT = "timeout";
private static final String DEFAULT_VALUE_PUBLISHER_TIMEOUT = "20000";
public static final String PROPERTY_KEY_PUBLISHER_LIMIT = "limit";
private static final String DEFAULT_VALUE_PUBLISHER_LIMIT = "10000";
-
+
public static final String PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE = "maxBatchSize";
public static final String DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE = "100";
-
+
public static final String PROPERTY_KEY_PUBLISHER_MAXAGEMS = "maxAgeMs";
public static final String DEFAULT_VALUE_PUBLISHER_MAXAGEMS = "250";
-
+
public static final String PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "MessageSentThreadOccurance";
public static final String DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "50";
- private final ConfigurationFileRepresentation configuration;
-
- public GeneralConfig(ConfigurationFileRepresentation configuration) {
- this.configuration = configuration;
- this.configuration.addSection(SECTION_MARKER);
- defaults();
- }
-
- public Boolean getEnabled() {
- Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED);
- return enabled;
- }
-
- public String getHostPort() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT);
- }
-
- public String getTransportType() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE);
- }
-
- public String getTopic() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC);
- }
-
- public String getTimeout() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT);
- }
-
- public String getLimit() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT);
- }
-
- public String getContenttype() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE);
- }
-
- public String getMaxBatchSize() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE);
- }
-
- public String getMaxAgeMs() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS);
- }
-
- public String getMessageSentThreadOccurrence() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE);
- }
-
- @Override
- public String getSectionName() {
- return SECTION_MARKER;
- }
-
- @Override
- public void defaults() {
- // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT, DEFAULT_VALUE_PUBLISHER_HOST_PORT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC, DEFAULT_VALUE_PUBLISHER_TOPIC);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE, DEFAULT_VALUE_PUBLISHER_CONTENTTYPE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT, DEFAULT_VALUE_PUBLISHER_TIMEOUT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT, DEFAULT_VALUE_PUBLISHER_LIMIT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS, DEFAULT_VALUE_PUBLISHER_MAXAGEMS);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE);
- }
+ private final ConfigurationFileRepresentation configuration;
+
+ public GeneralConfig(ConfigurationFileRepresentation configuration) {
+ this.configuration = configuration;
+ this.configuration.addSection(SECTION_MARKER);
+ defaults();
+ }
+
+ public Boolean getEnabled() {
+ Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED);
+ return enabled;
+ }
+
+ public String getHostPort() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT);
+ }
+
+ public String getTransportType() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE);
+ }
+
+ public String getTopic() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC);
+ }
+
+ public String getTimeout() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT);
+ }
+
+ public String getLimit() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT);
+ }
+
+ public String getContenttype() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE);
+ }
+
+ public String getMaxBatchSize() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE);
+ }
+
+ public String getMaxAgeMs() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS);
+ }
+
+ public String getMessageSentThreadOccurrence() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE);
+ }
+
+ @Override
+ public String getSectionName() {
+ return SECTION_MARKER;
+ }
+
+ @Override
+ public void defaults() {
+ // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE,
+ DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT,
+ DEFAULT_VALUE_PUBLISHER_HOST_PORT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC,
+ DEFAULT_VALUE_PUBLISHER_TOPIC);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE,
+ DEFAULT_VALUE_PUBLISHER_CONTENTTYPE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT,
+ DEFAULT_VALUE_PUBLISHER_TIMEOUT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT,
+ DEFAULT_VALUE_PUBLISHER_LIMIT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE,
+ DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS,
+ DEFAULT_VALUE_PUBLISHER_MAXAGEMS);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE,
+ DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE);
+ }
}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
index 9d7ce090d..48cb76ead 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
@@ -38,8 +38,8 @@ public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectList
NetconfNode netconfNode = accessor.getNetconfNode();
//, MountPoint mountpoint, DataBroker netconfNodeDataBroker;
- LOG.debug("In onEnterConnected of MountpointNodeConnectListenerImpl - nNodeId = "+nNodeId.getValue()+
- " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().getValue());
+ LOG.debug("In onEnterConnected of MountpointNodeConnectListenerImpl - nNodeId = " + nNodeId.getValue()
+ + " IP Address = " + netconfNode.getHost().getIpAddress().getIpv4Address().getValue());
JSONObject obj = new JSONObject();
obj.put("NodeId", nNodeId.getValue());
@@ -52,7 +52,7 @@ public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectList
@Override
public void onLeaveConnected(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) {
- LOG.debug("In onLeaveConnected of MountpointNodeConnectListenerImpl - nNodeId = "+nNodeId);
+ LOG.debug("In onLeaveConnected of MountpointNodeConnectListenerImpl - nNodeId = " + nNodeId);
JSONObject obj = new JSONObject();
obj.put("NodeId", nNodeId.getValue());
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
index fb2651ee4..b7d76f38d 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
@@ -29,43 +29,44 @@ import org.slf4j.LoggerFactory;
public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener {
- private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class);
- @Override
- public void onCreated(NodeId nNodeId, NetconfNode netconfNode) {
+ private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class);
- LOG.info("In onCreated of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId.getValue()+
- " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().toString());
- JSONObject obj = new JSONObject();
- obj.put("NodeId", nNodeId.getValue());
- obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString());
- obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
+ @Override
+ public void onCreated(NodeId nNodeId, NetconfNode netconfNode) {
- MountpointStatePublisher.stateObjects.add(obj);
- }
+ LOG.info("In onCreated of MountpointNodeStateListenerImpl - nNodeId = " + nNodeId.getValue() + " IP Address = "
+ + netconfNode.getHost().getIpAddress().getIpv4Address().toString());
+ JSONObject obj = new JSONObject();
+ obj.put("NodeId", nNodeId.getValue());
+ obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString());
+ obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
- @Override
- public void onStateChange(NodeId nNodeId, NetconfNode netconfNode) {
+ MountpointStatePublisher.stateObjects.add(obj);
+ }
- LOG.info("In onStateChange of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId.getValue()+
- " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().getValue());
- JSONObject obj = new JSONObject();
- obj.put("NodeId", nNodeId.getValue());
- obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString());
- obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
+ @Override
+ public void onStateChange(NodeId nNodeId, NetconfNode netconfNode) {
- MountpointStatePublisher.stateObjects.add(obj);
- }
+ LOG.info("In onStateChange of MountpointNodeStateListenerImpl - nNodeId = " + nNodeId.getValue()
+ + " IP Address = " + netconfNode.getHost().getIpAddress().getIpv4Address().getValue());
+ JSONObject obj = new JSONObject();
+ obj.put("NodeId", nNodeId.getValue());
+ obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString());
+ obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
- @Override
- public void onRemoved(NodeId nNodeId) {
+ MountpointStatePublisher.stateObjects.add(obj);
+ }
- LOG.info("In onRemoved of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId);
- JSONObject obj = new JSONObject();
- obj.put("NodeId", nNodeId.getValue());
- obj.put("NetConfNodeState", "Removed");
- obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
+ @Override
+ public void onRemoved(NodeId nNodeId) {
- MountpointStatePublisher.stateObjects.add(obj);
- }
+ LOG.info("In onRemoved of MountpointNodeStateListenerImpl - nNodeId = " + nNodeId);
+ JSONObject obj = new JSONObject();
+ obj.put("NodeId", nNodeId.getValue());
+ obj.put("NetConfNodeState", "Removed");
+ obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
+
+ MountpointStatePublisher.stateObjects.add(obj);
+ }
}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
index 0223e7323..cb5cbe3e2 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
@@ -58,7 +58,8 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange
public void init() {
LOG.info("Init call for {}", APPLICATION_NAME);
- ConfigurationFileRepresentation configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
+ ConfigurationFileRepresentation configFileRepresentation =
+ new ConfigurationFileRepresentation(CONFIGURATIONFILE);
configFileRepresentation.registerConfigChangedListener(this);
generalConfig = new GeneralConfig(configFileRepresentation);
@@ -72,6 +73,7 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange
/**
* Reflect status for Unit Tests
+ *
* @return Text with status
*/
public String isInitializationOk() {
@@ -91,13 +93,13 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange
netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener);
netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener);
} else if (dmaapEnabled && !dmaapEnabledNewVal) {
- // DMaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
+ // DMaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
LOG.info("DMaaP is disabled, stop publisher");
try {
- MountpointStatePublisher.stopPublisher();
- } catch (IOException | InterruptedException e) {
- LOG.error("Exception while stopping publisher ", e);
- }
+ MountpointStatePublisher.stopPublisher();
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Exception while stopping publisher ", e);
+ }
}
dmaapEnabled = dmaapEnabledNewVal;
}
@@ -107,12 +109,12 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange
LOG.info("{} closing ...", this.getClass().getName());
//close(updateService, configService, mwtnService); issue#1
try {
- MountpointStatePublisher.stopPublisher();
- } catch (IOException | InterruptedException e) {
- LOG.error("Exception while stopping publisher ", e);
- }
+ MountpointStatePublisher.stopPublisher();
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Exception while stopping publisher ", e);
+ }
//close(updateService, mwtnService);
- LOG.info("{} closing done",APPLICATION_NAME);
+ LOG.info("{} closing done", APPLICATION_NAME);
}
/**
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java
index 30857dec5..7f9fb2370 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java
@@ -41,88 +41,89 @@ import org.slf4j.LoggerFactory;
public class MountpointStatePublisher implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class);
- public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>();
- static MRBatchingPublisher pub;
- Properties publisherProperties = new Properties();
- static boolean closePublisher = false; //Set this to true in the "Close" method of MountpointStateProviderImpl
- private int fetchPause = 5000; // Default pause between fetch - 5 seconds
-
-
- public MountpointStatePublisher(Configuration config) {
- initialize(config);
- }
-
- public void initialize(Configuration config) {
- LOG.info("In initializePublisher method of MountpointStatePublisher");
- GeneralConfig generalCfg = (GeneralConfig)config;
-
- publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType());
- publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort());
- publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype());
- publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic());
- publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize());
- publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs());
- publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, generalCfg.getMessageSentThreadOccurrence());
-
- createPublisher(publisherProperties);
- }
-
- public MRBatchingPublisher createPublisher(Properties publisherProperties) {
-
- try {
- pub = MRClientFactory.createBatchingPublisher(publisherProperties, false);
- return pub;
- } catch (IOException e) {
- LOG.info("Exception while creating a publisher", e);
-
- }
- return null;
- }
-
- public void publishMessage(MRBatchingPublisher pub, String msg) {
- LOG.info("Publishing message {} - ", msg);
- try {
- pub.send(msg);
- } catch (IOException e) {
- LOG.info("Exception while publishing a mesage ", e);
- }
- }
-
- public MRBatchingPublisher getPublisher() {
- return pub;
- }
-
- public void run() {
-
- while (!closePublisher) {
- try {
- if (stateObjects.size() > 0) {
- JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst();
- publishMessage(getPublisher(), obj.toString());
- } else {
- pauseThread();
- }
- } catch(Exception ex) {
- LOG.error("Exception while publishing message, ignoring and continuing ... ", ex);
- }
-
- MRPublisherResponse res= pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called
- LOG.debug("Response message = {} ",res.toString());
- }
- }
-
- private void pauseThread() throws InterruptedException {
- if (fetchPause > 0) {
- LOG.debug("No data yet to publish. Pausing {} ms before retry ", fetchPause);
- Thread.sleep(fetchPause);
- } else {
- LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately");
- }
- }
-
- public static void stopPublisher() throws IOException, InterruptedException {
- closePublisher = true;
- pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close
- }
+ private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class);
+ public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>();
+ static MRBatchingPublisher pub;
+ Properties publisherProperties = new Properties();
+ static boolean closePublisher = false; //Set this to true in the "Close" method of MountpointStateProviderImpl
+ private int fetchPause = 5000; // Default pause between fetch - 5 seconds
+
+
+ public MountpointStatePublisher(Configuration config) {
+ initialize(config);
+ }
+
+ public void initialize(Configuration config) {
+ LOG.info("In initializePublisher method of MountpointStatePublisher");
+ GeneralConfig generalCfg = (GeneralConfig) config;
+
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE,
+ generalCfg.getMessageSentThreadOccurrence());
+
+ createPublisher(publisherProperties);
+ }
+
+ public MRBatchingPublisher createPublisher(Properties publisherProperties) {
+
+ try {
+ pub = MRClientFactory.createBatchingPublisher(publisherProperties, false);
+ return pub;
+ } catch (IOException e) {
+ LOG.info("Exception while creating a publisher", e);
+
+ }
+ return null;
+ }
+
+ public void publishMessage(MRBatchingPublisher pub, String msg) {
+ LOG.info("Publishing message {} - ", msg);
+ try {
+ pub.send(msg);
+ } catch (IOException e) {
+ LOG.info("Exception while publishing a mesage ", e);
+ }
+ }
+
+ public MRBatchingPublisher getPublisher() {
+ return pub;
+ }
+
+ public void run() {
+
+ while (!closePublisher) {
+ try {
+ if (stateObjects.size() > 0) {
+ JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst();
+ publishMessage(getPublisher(), obj.toString());
+ } else {
+ pauseThread();
+ }
+ } catch (Exception ex) {
+ LOG.error("Exception while publishing message, ignoring and continuing ... ", ex);
+ }
+
+ MRPublisherResponse res = pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called
+ LOG.debug("Response message = {} ", res.toString());
+ }
+ }
+
+ private void pauseThread() throws InterruptedException {
+ if (fetchPause > 0) {
+ LOG.debug("No data yet to publish. Pausing {} ms before retry ", fetchPause);
+ Thread.sleep(fetchPause);
+ } else {
+ LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately");
+ }
+ }
+
+ public static void stopPublisher() throws IOException, InterruptedException {
+ closePublisher = true;
+ pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close
+ }
}