aboutsummaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-state-provider/provider/src/main/java
diff options
context:
space:
mode:
authorherbert <herbert.eiselt@highstreet-technologies.com>2020-01-30 11:46:02 +0100
committerHerbert Eiselt <herbert.eiselt@highstreet-technologies.com>2020-02-01 12:37:42 +0000
commit8fb01420d6e5b5c3284da57292e28ce40874aaf4 (patch)
treec5491963ca8941ea845426651d92b3791f0edfbb /sdnr/wt/mountpoint-state-provider/provider/src/main/java
parentcbbc3520048bbc539b137cac011776506b3e0f70 (diff)
SDN-R add updated app
add mountpoint-state-provider Issue-ID: SDNC-1038 Signed-off-by: herbert <herbert.eiselt@highstreet-technologies.com> Change-Id: Ib14ec872962dfd545e4f130729d3bb5a5716fde9 Signed-off-by: herbert <herbert.eiselt@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-state-provider/provider/src/main/java')
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java132
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java71
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java71
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java126
-rw-r--r--sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java121
5 files changed, 521 insertions, 0 deletions
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java
new file mode 100644
index 000000000..830346027
--- /dev/null
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java
@@ -0,0 +1,132 @@
+/*******************************************************************************
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ ******************************************************************************/
+package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+
+/**
+ * Configuration of mountpoint-state-provider, general section<br>
+ * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not
+ * Generates default Configuration properties if none exist or exist partially
+ * Generates Publisher properties only for TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not
+ * generated by default. For a list of applicable properties for the different TranportType values, please see - https://wiki.onap.org/display/DW/Feature+configuration+requirements
+ */
+public class GeneralConfig implements Configuration {
+
+ private static final String SECTION_MARKER = "general";
+
+ private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled" ; //"disabled";
+
+ public static final String PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE = "TransportType";
+ private static final String DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE = "HTTPNOAUTH";
+
+ public static final String PROPERTY_KEY_PUBLISHER_HOST_PORT = "host";
+ private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmap:3904";
+
+ public static final String PROPERTY_KEY_PUBLISHER_TOPIC = "topic";
+ private static final String DEFAULT_VALUE_PUBLISHER_TOPIC = "unauthenticated.SDNR_MOUNTPOINT_STATE_INFO";
+
+ public static final String PROPERTY_KEY_PUBLISHER_CONTENTTYPE = "contenttype";
+ private static final String DEFAULT_VALUE_PUBLISHER_CONTENTTYPE = "application/json";
+
+ public static final String PROPERTY_KEY_PUBLISHER_TIMEOUT = "timeout";
+ private static final String DEFAULT_VALUE_PUBLISHER_TIMEOUT = "20000";
+
+ public static final String PROPERTY_KEY_PUBLISHER_LIMIT = "limit";
+ private static final String DEFAULT_VALUE_PUBLISHER_LIMIT = "10000";
+
+ public static final String PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE = "maxBatchSize";
+ public static final String DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE = "100";
+
+ public static final String PROPERTY_KEY_PUBLISHER_MAXAGEMS = "maxAgeMs";
+ public static final String DEFAULT_VALUE_PUBLISHER_MAXAGEMS = "250";
+
+ public static final String PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "MessageSentThreadOccurance";
+ public static final String DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "50";
+
+ private final ConfigurationFileRepresentation configuration;
+
+ public GeneralConfig(ConfigurationFileRepresentation configuration) {
+ this.configuration = configuration;
+ this.configuration.addSection(SECTION_MARKER);
+ defaults();
+ }
+
+ public Boolean getEnabled() {
+ Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED);
+ return enabled;
+ }
+
+ public String getHostPort() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT);
+ }
+
+ public String getTransportType() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE);
+ }
+
+ public String getTopic() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC);
+ }
+
+ public String getTimeout() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT);
+ }
+
+ public String getLimit() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT);
+ }
+
+ public String getContenttype() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE);
+ }
+
+ public String getMaxBatchSize() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE);
+ }
+
+ public String getMaxAgeMs() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS);
+ }
+
+ public String getMessageSentThreadOccurrence() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE);
+ }
+
+ @Override
+ public String getSectionName() {
+ return SECTION_MARKER;
+ }
+
+ @Override
+ public void defaults() {
+ // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT, DEFAULT_VALUE_PUBLISHER_HOST_PORT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC, DEFAULT_VALUE_PUBLISHER_TOPIC);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE, DEFAULT_VALUE_PUBLISHER_CONTENTTYPE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT, DEFAULT_VALUE_PUBLISHER_TIMEOUT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT, DEFAULT_VALUE_PUBLISHER_LIMIT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS, DEFAULT_VALUE_PUBLISHER_MAXAGEMS);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE);
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
new file mode 100644
index 000000000..9d7ce090d
--- /dev/null
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ ******************************************************************************/
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+
+import java.util.Optional;
+import org.eclipse.jdt.annotation.NonNull;
+import org.json.JSONObject;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectListener {
+ private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeConnectListenerImpl.class);
+
+ @Override
+ public void onEnterConnected(@NonNull NetconfAccessor accessor) {
+ NodeId nNodeId = accessor.getNodeId();
+ NetconfNode netconfNode = accessor.getNetconfNode();
+ //, MountPoint mountpoint, DataBroker netconfNodeDataBroker;
+
+ LOG.debug("In onEnterConnected of MountpointNodeConnectListenerImpl - nNodeId = "+nNodeId.getValue()+
+ " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().getValue());
+
+ JSONObject obj = new JSONObject();
+ obj.put("NodeId", nNodeId.getValue());
+ obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString());
+ obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
+
+ MountpointStatePublisher.stateObjects.add(obj);
+ }
+
+ @Override
+ public void onLeaveConnected(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) {
+
+ LOG.debug("In onLeaveConnected of MountpointNodeConnectListenerImpl - nNodeId = "+nNodeId);
+
+ JSONObject obj = new JSONObject();
+ obj.put("NodeId", nNodeId.getValue());
+ obj.put("NetConfNodeState", "Unmounted");
+ obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
+
+ MountpointStatePublisher.stateObjects.add(obj);
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ LOG.debug("In close of MountpointNodeConnectListenerImpl");
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
new file mode 100644
index 000000000..fb2651ee4
--- /dev/null
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ ******************************************************************************/
+
+package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+
+import org.json.JSONObject;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener {
+ private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class);
+ @Override
+ public void onCreated(NodeId nNodeId, NetconfNode netconfNode) {
+
+ LOG.info("In onCreated of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId.getValue()+
+ " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().toString());
+ JSONObject obj = new JSONObject();
+ obj.put("NodeId", nNodeId.getValue());
+ obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString());
+ obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
+
+ MountpointStatePublisher.stateObjects.add(obj);
+ }
+
+ @Override
+ public void onStateChange(NodeId nNodeId, NetconfNode netconfNode) {
+
+ LOG.info("In onStateChange of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId.getValue()+
+ " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().getValue());
+ JSONObject obj = new JSONObject();
+ obj.put("NodeId", nNodeId.getValue());
+ obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString());
+ obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
+
+ MountpointStatePublisher.stateObjects.add(obj);
+ }
+
+ @Override
+ public void onRemoved(NodeId nNodeId) {
+
+ LOG.info("In onRemoved of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId);
+ JSONObject obj = new JSONObject();
+ obj.put("NodeId", nNodeId.getValue());
+ obj.put("NetConfNodeState", "Removed");
+ obj.put("TimeStamp", java.time.Clock.systemUTC().instant());
+
+ MountpointStatePublisher.stateObjects.add(obj);
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
new file mode 100644
index 000000000..5ffae29f4
--- /dev/null
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
@@ -0,0 +1,126 @@
+package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+/*******************************************************************************
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ ******************************************************************************/
+
+import java.io.IOException;
+
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("deprecation")
+public class MountpointStateProviderImpl implements AutoCloseable, IConfigChangedListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MountpointStateProviderImpl.class);
+ private static final String APPLICATION_NAME = "mountpoint-state-provider";
+ private static final String CONFIGURATIONFILE = "etc/mountpoint-state-provider.properties";
+
+ private NetconfNodeStateService netconfNodeStateService;
+
+ private GeneralConfig generalConfig;
+ private boolean dmaapEnabled = false;
+ private Thread mountpointStatePublisher = null;
+
+ MountpointNodeConnectListenerImpl nodeConnectListener = new MountpointNodeConnectListenerImpl();
+ MountpointNodeStateListenerImpl nodeStateListener = new MountpointNodeStateListenerImpl();
+
+ public MountpointStateProviderImpl() {
+ LOG.info("Creating provider class for {}", APPLICATION_NAME);
+ }
+
+ public void setNetconfNodeStateService(NetconfNodeStateService netconfNodeStateService) {
+ this.netconfNodeStateService = netconfNodeStateService;
+ }
+
+ public void init() {
+ LOG.info("Init call for {}", APPLICATION_NAME);
+ ConfigurationFileRepresentation configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
+ configFileRepresentation.registerConfigChangedListener(this);
+
+ generalConfig = new GeneralConfig(configFileRepresentation);
+ if (generalConfig.getEnabled()) { //dmaapEnabled
+ mountpointStatePublisher = new Thread(new MountpointStatePublisher(generalConfig));
+ mountpointStatePublisher.start();
+ netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener);
+ netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener);
+ }
+ }
+
+ /**
+ * Reflect status for Unit Tests
+ * @return Text with status
+ */
+ public String isInitializationOk() {
+ return "No implemented";
+ }
+
+ @Override
+ public void onConfigChanged() {
+ LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled());
+ boolean dmaapEnabledNewVal = generalConfig.getEnabled();
+
+ // DMaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
+ if (!dmaapEnabled && dmaapEnabledNewVal) {
+ LOG.info("DMaaP is enabled, starting Publisher");
+ mountpointStatePublisher = new Thread(new MountpointStatePublisher(generalConfig));
+ mountpointStatePublisher.start();
+ netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener);
+ netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener);
+ } else if (dmaapEnabled && !dmaapEnabledNewVal) {
+ // DMaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
+ LOG.info("DMaaP is disabled, stop publisher");
+ try {
+ MountpointStatePublisher.stopPublisher();
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Exception while stopping publisher ", e);
+ }
+ }
+ dmaapEnabled = dmaapEnabledNewVal;
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOG.info("{} closing ...", this.getClass().getName());
+ //close(updateService, configService, mwtnService); issue#1
+ try {
+ MountpointStatePublisher.stopPublisher();
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Exception while stopping publisher ", e);
+ }
+ //close(updateService, mwtnService);
+ LOG.info("{} closing done",APPLICATION_NAME);
+ }
+
+ /**
+ * Used to close all Services, that should support AutoCloseable Pattern
+ *
+ * @param toClose
+ * @throws Exception
+ */
+ @SuppressWarnings("unused")
+ private void close(AutoCloseable... toCloseList) throws Exception {
+ for (AutoCloseable element : toCloseList) {
+ if (element != null) {
+ element.close();
+ }
+ }
+ }
+
+}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java
new file mode 100644
index 000000000..972c251e6
--- /dev/null
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java
@@ -0,0 +1,121 @@
+package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+/*******************************************************************************
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ ******************************************************************************/
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.json.JSONObject;
+import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.response.MRPublisherResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MountpointStatePublisher implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class);
+ public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>();
+ static MRBatchingPublisher pub;
+ Properties publisherProperties = new Properties();
+ static boolean closePublisher = false; //Set this to true in the "Close" method of MountpointStateProviderImpl
+ private int fetchPause = 5000; // Default pause between fetch - 5 seconds
+
+
+ public MountpointStatePublisher(Configuration config) {
+ initialize(config);
+ }
+
+ public void initialize(Configuration config) {
+ LOG.info("In initializePublisher method of MountpointStatePublisher");
+ GeneralConfig generalCfg = (GeneralConfig)config;
+
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs());
+ publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, generalCfg.getMessageSentThreadOccurrence());
+
+ createPublisher(publisherProperties);
+ }
+
+ public MRBatchingPublisher createPublisher(Properties publisherProperties) {
+
+ try {
+ pub = MRClientFactory.createBatchingPublisher(publisherProperties, false);
+ return pub;
+ } catch (IOException e) {
+ LOG.info("Exception while creating a publisher", e);
+
+ }
+ return null;
+ }
+
+ public void publishMessage(MRBatchingPublisher pub, String msg) {
+ LOG.info("Publishing message {} - ", msg);
+ try {
+ pub.send(msg);
+ } catch (IOException e) {
+ LOG.info("Exception while publishing a mesage ", e);
+ }
+ }
+
+ public MRBatchingPublisher getPublisher() {
+ return pub;
+ }
+
+ public void run() {
+
+ while (!closePublisher) {
+ try {
+ if (stateObjects.size() > 0) {
+ JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst();
+ publishMessage(getPublisher(), obj.toString());
+ } else {
+ pauseThread();
+ }
+ } catch(Exception ex) {
+ LOG.error("Exception while publishing message, ignoring and continuing ... ", ex);
+ }
+
+ MRPublisherResponse res= pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called
+ LOG.debug("Response message = {} ",res.toString());
+ }
+ }
+
+ private void pauseThread() throws InterruptedException {
+ if (fetchPause > 0) {
+ LOG.debug("No data yet to publish. Pausing {} ms before retry ", fetchPause);
+ Thread.sleep(fetchPause);
+ } else {
+ LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately");
+ }
+ }
+
+ public static void stopPublisher() throws IOException, InterruptedException {
+ closePublisher = true;
+ pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close
+ }
+}