diff options
Diffstat (limited to 'sdnr/wt/mountpoint-state-provider/provider/src/main/java')
5 files changed, 0 insertions, 516 deletions
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java deleted file mode 100644 index 830346027..000000000 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java +++ /dev/null @@ -1,132 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================================== - * ONAP : ccsdk feature sdnr wt - * ================================================================================================= - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - ******************************************************************************/ -package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; - -import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; - -/** - * Configuration of mountpoint-state-provider, general section<br> - * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not - * Generates default Configuration properties if none exist or exist partially - * Generates Publisher properties only for TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not - * generated by default. For a list of applicable properties for the different TranportType values, please see - https://wiki.onap.org/display/DW/Feature+configuration+requirements - */ -public class GeneralConfig implements Configuration { - - private static final String SECTION_MARKER = "general"; - - private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled" ; //"disabled"; - - public static final String PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE = "TransportType"; - private static final String DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE = "HTTPNOAUTH"; - - public static final String PROPERTY_KEY_PUBLISHER_HOST_PORT = "host"; - private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmap:3904"; - - public static final String PROPERTY_KEY_PUBLISHER_TOPIC = "topic"; - private static final String DEFAULT_VALUE_PUBLISHER_TOPIC = "unauthenticated.SDNR_MOUNTPOINT_STATE_INFO"; - - public static final String PROPERTY_KEY_PUBLISHER_CONTENTTYPE = "contenttype"; - private static final String DEFAULT_VALUE_PUBLISHER_CONTENTTYPE = "application/json"; - - public static final String PROPERTY_KEY_PUBLISHER_TIMEOUT = "timeout"; - private static final String DEFAULT_VALUE_PUBLISHER_TIMEOUT = "20000"; - - public static final String PROPERTY_KEY_PUBLISHER_LIMIT = "limit"; - private static final String DEFAULT_VALUE_PUBLISHER_LIMIT = "10000"; - - public static final String PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE = "maxBatchSize"; - public static final String DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE = "100"; - - public static final String PROPERTY_KEY_PUBLISHER_MAXAGEMS = "maxAgeMs"; - public static final String DEFAULT_VALUE_PUBLISHER_MAXAGEMS = "250"; - - public static final String PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "MessageSentThreadOccurance"; - public static final String DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "50"; - - private final ConfigurationFileRepresentation configuration; - - public GeneralConfig(ConfigurationFileRepresentation configuration) { - this.configuration = configuration; - this.configuration.addSection(SECTION_MARKER); - defaults(); - } - - public Boolean getEnabled() { - Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED); - return enabled; - } - - public String getHostPort() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT); - } - - public String getTransportType() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE); - } - - public String getTopic() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC); - } - - public String getTimeout() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT); - } - - public String getLimit() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT); - } - - public String getContenttype() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE); - } - - public String getMaxBatchSize() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE); - } - - public String getMaxAgeMs() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS); - } - - public String getMessageSentThreadOccurrence() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE); - } - - @Override - public String getSectionName() { - return SECTION_MARKER; - } - - @Override - public void defaults() { - // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT, DEFAULT_VALUE_PUBLISHER_HOST_PORT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC, DEFAULT_VALUE_PUBLISHER_TOPIC); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE, DEFAULT_VALUE_PUBLISHER_CONTENTTYPE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT, DEFAULT_VALUE_PUBLISHER_TIMEOUT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT, DEFAULT_VALUE_PUBLISHER_LIMIT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS, DEFAULT_VALUE_PUBLISHER_MAXAGEMS); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE); - } - -} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java deleted file mode 100644 index f0bf4961e..000000000 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java +++ /dev/null @@ -1,66 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================================== - * ONAP : ccsdk feature sdnr wt - * ================================================================================================= - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - ******************************************************************************/ - -package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; - -import org.json.JSONObject; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener; -import org.opendaylight.mdsal.binding.api.DataBroker; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectListener { - private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeConnectListenerImpl.class); - - @Override - public void onEnterConnected(NodeId nNodeId, NetconfNode netconfNode, DataBroker netconfNodeDataBroker) { - - LOG.debug("In onEnterConnected of MountpointNodeConnectListenerImpl - nNodeId = "+nNodeId.getValue()+ - " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().getValue()); - - JSONObject obj = new JSONObject(); - obj.put("NodeId", nNodeId.getValue()); - obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); - obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - - MountpointStatePublisher.stateObjects.add(obj); - } - - @Override - public void onLeaveConnected(NodeId nNodeId) { - - LOG.debug("In onLeaveConnected of MountpointNodeConnectListenerImpl - nNodeId = "+nNodeId); - - JSONObject obj = new JSONObject(); - obj.put("NodeId", nNodeId.getValue()); - obj.put("NetConfNodeState", "Unmounted"); - obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - - MountpointStatePublisher.stateObjects.add(obj); - } - - public void close() throws Exception { - - LOG.debug("In close of MountpointNodeConnectListenerImpl"); - } - -} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java deleted file mode 100644 index fb2651ee4..000000000 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java +++ /dev/null @@ -1,71 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================================== - * ONAP : ccsdk feature sdnr wt - * ================================================================================================= - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - ******************************************************************************/ - -package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; - -import org.json.JSONObject; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; -import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeConnectionStatus.ConnectionStatus; -import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener { - private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class); - @Override - public void onCreated(NodeId nNodeId, NetconfNode netconfNode) { - - LOG.info("In onCreated of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId.getValue()+ - " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().toString()); - JSONObject obj = new JSONObject(); - obj.put("NodeId", nNodeId.getValue()); - obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); - obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - - MountpointStatePublisher.stateObjects.add(obj); - } - - @Override - public void onStateChange(NodeId nNodeId, NetconfNode netconfNode) { - - LOG.info("In onStateChange of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId.getValue()+ - " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().getValue()); - JSONObject obj = new JSONObject(); - obj.put("NodeId", nNodeId.getValue()); - obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); - obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - - MountpointStatePublisher.stateObjects.add(obj); - } - - @Override - public void onRemoved(NodeId nNodeId) { - - LOG.info("In onRemoved of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId); - JSONObject obj = new JSONObject(); - obj.put("NodeId", nNodeId.getValue()); - obj.put("NetConfNodeState", "Removed"); - obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - - MountpointStatePublisher.stateObjects.add(obj); - } - -} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java deleted file mode 100644 index 5ffae29f4..000000000 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java +++ /dev/null @@ -1,126 +0,0 @@ -package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; -/******************************************************************************* - * ============LICENSE_START======================================================================== - * ONAP : ccsdk feature sdnr wt - * ================================================================================================= - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - ******************************************************************************/ - -import java.io.IOException; - -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener; -import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings("deprecation") -public class MountpointStateProviderImpl implements AutoCloseable, IConfigChangedListener { - - private static final Logger LOG = LoggerFactory.getLogger(MountpointStateProviderImpl.class); - private static final String APPLICATION_NAME = "mountpoint-state-provider"; - private static final String CONFIGURATIONFILE = "etc/mountpoint-state-provider.properties"; - - private NetconfNodeStateService netconfNodeStateService; - - private GeneralConfig generalConfig; - private boolean dmaapEnabled = false; - private Thread mountpointStatePublisher = null; - - MountpointNodeConnectListenerImpl nodeConnectListener = new MountpointNodeConnectListenerImpl(); - MountpointNodeStateListenerImpl nodeStateListener = new MountpointNodeStateListenerImpl(); - - public MountpointStateProviderImpl() { - LOG.info("Creating provider class for {}", APPLICATION_NAME); - } - - public void setNetconfNodeStateService(NetconfNodeStateService netconfNodeStateService) { - this.netconfNodeStateService = netconfNodeStateService; - } - - public void init() { - LOG.info("Init call for {}", APPLICATION_NAME); - ConfigurationFileRepresentation configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE); - configFileRepresentation.registerConfigChangedListener(this); - - generalConfig = new GeneralConfig(configFileRepresentation); - if (generalConfig.getEnabled()) { //dmaapEnabled - mountpointStatePublisher = new Thread(new MountpointStatePublisher(generalConfig)); - mountpointStatePublisher.start(); - netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener); - netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener); - } - } - - /** - * Reflect status for Unit Tests - * @return Text with status - */ - public String isInitializationOk() { - return "No implemented"; - } - - @Override - public void onConfigChanged() { - LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled()); - boolean dmaapEnabledNewVal = generalConfig.getEnabled(); - - // DMaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s) - if (!dmaapEnabled && dmaapEnabledNewVal) { - LOG.info("DMaaP is enabled, starting Publisher"); - mountpointStatePublisher = new Thread(new MountpointStatePublisher(generalConfig)); - mountpointStatePublisher.start(); - netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener); - netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener); - } else if (dmaapEnabled && !dmaapEnabledNewVal) { - // DMaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s) - LOG.info("DMaaP is disabled, stop publisher"); - try { - MountpointStatePublisher.stopPublisher(); - } catch (IOException | InterruptedException e) { - LOG.error("Exception while stopping publisher ", e); - } - } - dmaapEnabled = dmaapEnabledNewVal; - } - - @Override - public void close() throws Exception { - LOG.info("{} closing ...", this.getClass().getName()); - //close(updateService, configService, mwtnService); issue#1 - try { - MountpointStatePublisher.stopPublisher(); - } catch (IOException | InterruptedException e) { - LOG.error("Exception while stopping publisher ", e); - } - //close(updateService, mwtnService); - LOG.info("{} closing done",APPLICATION_NAME); - } - - /** - * Used to close all Services, that should support AutoCloseable Pattern - * - * @param toClose - * @throws Exception - */ - @SuppressWarnings("unused") - private void close(AutoCloseable... toCloseList) throws Exception { - for (AutoCloseable element : toCloseList) { - if (element != null) { - element.close(); - } - } - } - -} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java deleted file mode 100644 index 972c251e6..000000000 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java +++ /dev/null @@ -1,121 +0,0 @@ -package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; -/******************************************************************************* - * ============LICENSE_START======================================================================== - * ONAP : ccsdk feature sdnr wt - * ================================================================================================= - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - ******************************************************************************/ - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import org.json.JSONObject; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; -import org.onap.dmaap.mr.client.MRBatchingPublisher; -import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.client.response.MRPublisherResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class MountpointStatePublisher implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class); - public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>(); - static MRBatchingPublisher pub; - Properties publisherProperties = new Properties(); - static boolean closePublisher = false; //Set this to true in the "Close" method of MountpointStateProviderImpl - private int fetchPause = 5000; // Default pause between fetch - 5 seconds - - - public MountpointStatePublisher(Configuration config) { - initialize(config); - } - - public void initialize(Configuration config) { - LOG.info("In initializePublisher method of MountpointStatePublisher"); - GeneralConfig generalCfg = (GeneralConfig)config; - - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, generalCfg.getMessageSentThreadOccurrence()); - - createPublisher(publisherProperties); - } - - public MRBatchingPublisher createPublisher(Properties publisherProperties) { - - try { - pub = MRClientFactory.createBatchingPublisher(publisherProperties, false); - return pub; - } catch (IOException e) { - LOG.info("Exception while creating a publisher", e); - - } - return null; - } - - public void publishMessage(MRBatchingPublisher pub, String msg) { - LOG.info("Publishing message {} - ", msg); - try { - pub.send(msg); - } catch (IOException e) { - LOG.info("Exception while publishing a mesage ", e); - } - } - - public MRBatchingPublisher getPublisher() { - return pub; - } - - public void run() { - - while (!closePublisher) { - try { - if (stateObjects.size() > 0) { - JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst(); - publishMessage(getPublisher(), obj.toString()); - } else { - pauseThread(); - } - } catch(Exception ex) { - LOG.error("Exception while publishing message, ignoring and continuing ... ", ex); - } - - MRPublisherResponse res= pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called - LOG.debug("Response message = {} ",res.toString()); - } - } - - private void pauseThread() throws InterruptedException { - if (fetchPause > 0) { - LOG.debug("No data yet to publish. Pausing {} ms before retry ", fetchPause); - Thread.sleep(fetchPause); - } else { - LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately"); - } - } - - public static void stopPublisher() throws IOException, InterruptedException { - closePublisher = true; - pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close - } -} |