diff options
author | herbert <herbert.eiselt@highstreet-technologies.com> | 2020-01-30 11:46:02 +0100 |
---|---|---|
committer | Herbert Eiselt <herbert.eiselt@highstreet-technologies.com> | 2020-02-01 12:37:42 +0000 |
commit | 8fb01420d6e5b5c3284da57292e28ce40874aaf4 (patch) | |
tree | c5491963ca8941ea845426651d92b3791f0edfbb /sdnr/wt/mountpoint-state-provider/provider/src/main/java | |
parent | cbbc3520048bbc539b137cac011776506b3e0f70 (diff) |
SDN-R add updated app
add mountpoint-state-provider
Issue-ID: SDNC-1038
Signed-off-by: herbert <herbert.eiselt@highstreet-technologies.com>
Change-Id: Ib14ec872962dfd545e4f130729d3bb5a5716fde9
Signed-off-by: herbert <herbert.eiselt@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-state-provider/provider/src/main/java')
5 files changed, 521 insertions, 0 deletions
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java new file mode 100644 index 000000000..830346027 --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java @@ -0,0 +1,132 @@ +/******************************************************************************* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + ******************************************************************************/ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; +import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; + +/** + * Configuration of mountpoint-state-provider, general section<br> + * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not + * Generates default Configuration properties if none exist or exist partially + * Generates Publisher properties only for TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not + * generated by default. For a list of applicable properties for the different TranportType values, please see - https://wiki.onap.org/display/DW/Feature+configuration+requirements + */ +public class GeneralConfig implements Configuration { + + private static final String SECTION_MARKER = "general"; + + private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled" ; //"disabled"; + + public static final String PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE = "TransportType"; + private static final String DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE = "HTTPNOAUTH"; + + public static final String PROPERTY_KEY_PUBLISHER_HOST_PORT = "host"; + private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmap:3904"; + + public static final String PROPERTY_KEY_PUBLISHER_TOPIC = "topic"; + private static final String DEFAULT_VALUE_PUBLISHER_TOPIC = "unauthenticated.SDNR_MOUNTPOINT_STATE_INFO"; + + public static final String PROPERTY_KEY_PUBLISHER_CONTENTTYPE = "contenttype"; + private static final String DEFAULT_VALUE_PUBLISHER_CONTENTTYPE = "application/json"; + + public static final String PROPERTY_KEY_PUBLISHER_TIMEOUT = "timeout"; + private static final String DEFAULT_VALUE_PUBLISHER_TIMEOUT = "20000"; + + public static final String PROPERTY_KEY_PUBLISHER_LIMIT = "limit"; + private static final String DEFAULT_VALUE_PUBLISHER_LIMIT = "10000"; + + public static final String PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE = "maxBatchSize"; + public static final String DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE = "100"; + + public static final String PROPERTY_KEY_PUBLISHER_MAXAGEMS = "maxAgeMs"; + public static final String DEFAULT_VALUE_PUBLISHER_MAXAGEMS = "250"; + + public static final String PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "MessageSentThreadOccurance"; + public static final String DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "50"; + + private final ConfigurationFileRepresentation configuration; + + public GeneralConfig(ConfigurationFileRepresentation configuration) { + this.configuration = configuration; + this.configuration.addSection(SECTION_MARKER); + defaults(); + } + + public Boolean getEnabled() { + Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED); + return enabled; + } + + public String getHostPort() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT); + } + + public String getTransportType() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE); + } + + public String getTopic() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC); + } + + public String getTimeout() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT); + } + + public String getLimit() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT); + } + + public String getContenttype() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE); + } + + public String getMaxBatchSize() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE); + } + + public String getMaxAgeMs() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS); + } + + public String getMessageSentThreadOccurrence() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE); + } + + @Override + public String getSectionName() { + return SECTION_MARKER; + } + + @Override + public void defaults() { + // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT, DEFAULT_VALUE_PUBLISHER_HOST_PORT); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC, DEFAULT_VALUE_PUBLISHER_TOPIC); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE, DEFAULT_VALUE_PUBLISHER_CONTENTTYPE); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT, DEFAULT_VALUE_PUBLISHER_TIMEOUT); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT, DEFAULT_VALUE_PUBLISHER_LIMIT); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS, DEFAULT_VALUE_PUBLISHER_MAXAGEMS); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE); + } + +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java new file mode 100644 index 000000000..9d7ce090d --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java @@ -0,0 +1,71 @@ +/******************************************************************************* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + ******************************************************************************/ + +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; +import org.json.JSONObject; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectListener { + private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeConnectListenerImpl.class); + + @Override + public void onEnterConnected(@NonNull NetconfAccessor accessor) { + NodeId nNodeId = accessor.getNodeId(); + NetconfNode netconfNode = accessor.getNetconfNode(); + //, MountPoint mountpoint, DataBroker netconfNodeDataBroker; + + LOG.debug("In onEnterConnected of MountpointNodeConnectListenerImpl - nNodeId = "+nNodeId.getValue()+ + " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().getValue()); + + JSONObject obj = new JSONObject(); + obj.put("NodeId", nNodeId.getValue()); + obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); + obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); + + MountpointStatePublisher.stateObjects.add(obj); + } + + @Override + public void onLeaveConnected(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) { + + LOG.debug("In onLeaveConnected of MountpointNodeConnectListenerImpl - nNodeId = "+nNodeId); + + JSONObject obj = new JSONObject(); + obj.put("NodeId", nNodeId.getValue()); + obj.put("NetConfNodeState", "Unmounted"); + obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); + + MountpointStatePublisher.stateObjects.add(obj); + } + + @Override + public void close() throws Exception { + + LOG.debug("In close of MountpointNodeConnectListenerImpl"); + } + +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java new file mode 100644 index 000000000..fb2651ee4 --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java @@ -0,0 +1,71 @@ +/******************************************************************************* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + ******************************************************************************/ + +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import org.json.JSONObject; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener { + private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class); + @Override + public void onCreated(NodeId nNodeId, NetconfNode netconfNode) { + + LOG.info("In onCreated of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId.getValue()+ + " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().toString()); + JSONObject obj = new JSONObject(); + obj.put("NodeId", nNodeId.getValue()); + obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); + obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); + + MountpointStatePublisher.stateObjects.add(obj); + } + + @Override + public void onStateChange(NodeId nNodeId, NetconfNode netconfNode) { + + LOG.info("In onStateChange of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId.getValue()+ + " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().getValue()); + JSONObject obj = new JSONObject(); + obj.put("NodeId", nNodeId.getValue()); + obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); + obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); + + MountpointStatePublisher.stateObjects.add(obj); + } + + @Override + public void onRemoved(NodeId nNodeId) { + + LOG.info("In onRemoved of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId); + JSONObject obj = new JSONObject(); + obj.put("NodeId", nNodeId.getValue()); + obj.put("NetConfNodeState", "Removed"); + obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); + + MountpointStatePublisher.stateObjects.add(obj); + } + +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java new file mode 100644 index 000000000..5ffae29f4 --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java @@ -0,0 +1,126 @@ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; +/******************************************************************************* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + ******************************************************************************/ + +import java.io.IOException; + +import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; +import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("deprecation") +public class MountpointStateProviderImpl implements AutoCloseable, IConfigChangedListener { + + private static final Logger LOG = LoggerFactory.getLogger(MountpointStateProviderImpl.class); + private static final String APPLICATION_NAME = "mountpoint-state-provider"; + private static final String CONFIGURATIONFILE = "etc/mountpoint-state-provider.properties"; + + private NetconfNodeStateService netconfNodeStateService; + + private GeneralConfig generalConfig; + private boolean dmaapEnabled = false; + private Thread mountpointStatePublisher = null; + + MountpointNodeConnectListenerImpl nodeConnectListener = new MountpointNodeConnectListenerImpl(); + MountpointNodeStateListenerImpl nodeStateListener = new MountpointNodeStateListenerImpl(); + + public MountpointStateProviderImpl() { + LOG.info("Creating provider class for {}", APPLICATION_NAME); + } + + public void setNetconfNodeStateService(NetconfNodeStateService netconfNodeStateService) { + this.netconfNodeStateService = netconfNodeStateService; + } + + public void init() { + LOG.info("Init call for {}", APPLICATION_NAME); + ConfigurationFileRepresentation configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE); + configFileRepresentation.registerConfigChangedListener(this); + + generalConfig = new GeneralConfig(configFileRepresentation); + if (generalConfig.getEnabled()) { //dmaapEnabled + mountpointStatePublisher = new Thread(new MountpointStatePublisher(generalConfig)); + mountpointStatePublisher.start(); + netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener); + netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener); + } + } + + /** + * Reflect status for Unit Tests + * @return Text with status + */ + public String isInitializationOk() { + return "No implemented"; + } + + @Override + public void onConfigChanged() { + LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled()); + boolean dmaapEnabledNewVal = generalConfig.getEnabled(); + + // DMaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s) + if (!dmaapEnabled && dmaapEnabledNewVal) { + LOG.info("DMaaP is enabled, starting Publisher"); + mountpointStatePublisher = new Thread(new MountpointStatePublisher(generalConfig)); + mountpointStatePublisher.start(); + netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener); + netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener); + } else if (dmaapEnabled && !dmaapEnabledNewVal) { + // DMaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s) + LOG.info("DMaaP is disabled, stop publisher"); + try { + MountpointStatePublisher.stopPublisher(); + } catch (IOException | InterruptedException e) { + LOG.error("Exception while stopping publisher ", e); + } + } + dmaapEnabled = dmaapEnabledNewVal; + } + + @Override + public void close() throws Exception { + LOG.info("{} closing ...", this.getClass().getName()); + //close(updateService, configService, mwtnService); issue#1 + try { + MountpointStatePublisher.stopPublisher(); + } catch (IOException | InterruptedException e) { + LOG.error("Exception while stopping publisher ", e); + } + //close(updateService, mwtnService); + LOG.info("{} closing done",APPLICATION_NAME); + } + + /** + * Used to close all Services, that should support AutoCloseable Pattern + * + * @param toClose + * @throws Exception + */ + @SuppressWarnings("unused") + private void close(AutoCloseable... toCloseList) throws Exception { + for (AutoCloseable element : toCloseList) { + if (element != null) { + element.close(); + } + } + } + +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java new file mode 100644 index 000000000..972c251e6 --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java @@ -0,0 +1,121 @@ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; +/******************************************************************************* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + ******************************************************************************/ + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.json.JSONObject; +import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; +import org.onap.dmaap.mr.client.MRBatchingPublisher; +import org.onap.dmaap.mr.client.MRClientFactory; +import org.onap.dmaap.mr.client.response.MRPublisherResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class MountpointStatePublisher implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class); + public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>(); + static MRBatchingPublisher pub; + Properties publisherProperties = new Properties(); + static boolean closePublisher = false; //Set this to true in the "Close" method of MountpointStateProviderImpl + private int fetchPause = 5000; // Default pause between fetch - 5 seconds + + + public MountpointStatePublisher(Configuration config) { + initialize(config); + } + + public void initialize(Configuration config) { + LOG.info("In initializePublisher method of MountpointStatePublisher"); + GeneralConfig generalCfg = (GeneralConfig)config; + + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, generalCfg.getMessageSentThreadOccurrence()); + + createPublisher(publisherProperties); + } + + public MRBatchingPublisher createPublisher(Properties publisherProperties) { + + try { + pub = MRClientFactory.createBatchingPublisher(publisherProperties, false); + return pub; + } catch (IOException e) { + LOG.info("Exception while creating a publisher", e); + + } + return null; + } + + public void publishMessage(MRBatchingPublisher pub, String msg) { + LOG.info("Publishing message {} - ", msg); + try { + pub.send(msg); + } catch (IOException e) { + LOG.info("Exception while publishing a mesage ", e); + } + } + + public MRBatchingPublisher getPublisher() { + return pub; + } + + public void run() { + + while (!closePublisher) { + try { + if (stateObjects.size() > 0) { + JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst(); + publishMessage(getPublisher(), obj.toString()); + } else { + pauseThread(); + } + } catch(Exception ex) { + LOG.error("Exception while publishing message, ignoring and continuing ... ", ex); + } + + MRPublisherResponse res= pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called + LOG.debug("Response message = {} ",res.toString()); + } + } + + private void pauseThread() throws InterruptedException { + if (fetchPause > 0) { + LOG.debug("No data yet to publish. Pausing {} ms before retry ", fetchPause); + Thread.sleep(fetchPause); + } else { + LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately"); + } + } + + public static void stopPublisher() throws IOException, InterruptedException { + closePublisher = true; + pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close + } +} |