diff options
Diffstat (limited to 'sdnr/wt/mountpoint-state-provider/provider/src/main')
5 files changed, 229 insertions, 215 deletions
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java index 830346027..675ac8a2f 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * ============LICENSE_START======================================================================== * ONAP : ccsdk feature sdnr wt * ================================================================================================= @@ -14,7 +14,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. * ============LICENSE_END========================================================================== - ******************************************************************************/ + */ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; @@ -22,111 +22,121 @@ import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRep /** * Configuration of mountpoint-state-provider, general section<br> - * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not - * Generates default Configuration properties if none exist or exist partially - * Generates Publisher properties only for TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not - * generated by default. For a list of applicable properties for the different TranportType values, please see - https://wiki.onap.org/display/DW/Feature+configuration+requirements + * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not Generates default + * Configuration properties if none exist or exist partially Generates Publisher properties only for + * TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not + * generated by default. For a list of applicable properties for the different TranportType values, please see - + * https://wiki.onap.org/display/DW/Feature+configuration+requirements */ public class GeneralConfig implements Configuration { private static final String SECTION_MARKER = "general"; - private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled" ; //"disabled"; - + private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled"; //"disabled"; + public static final String PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE = "TransportType"; private static final String DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE = "HTTPNOAUTH"; - + public static final String PROPERTY_KEY_PUBLISHER_HOST_PORT = "host"; private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmap:3904"; - + public static final String PROPERTY_KEY_PUBLISHER_TOPIC = "topic"; private static final String DEFAULT_VALUE_PUBLISHER_TOPIC = "unauthenticated.SDNR_MOUNTPOINT_STATE_INFO"; - + public static final String PROPERTY_KEY_PUBLISHER_CONTENTTYPE = "contenttype"; private static final String DEFAULT_VALUE_PUBLISHER_CONTENTTYPE = "application/json"; - + public static final String PROPERTY_KEY_PUBLISHER_TIMEOUT = "timeout"; private static final String DEFAULT_VALUE_PUBLISHER_TIMEOUT = "20000"; public static final String PROPERTY_KEY_PUBLISHER_LIMIT = "limit"; private static final String DEFAULT_VALUE_PUBLISHER_LIMIT = "10000"; - + public static final String PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE = "maxBatchSize"; public static final String DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE = "100"; - + public static final String PROPERTY_KEY_PUBLISHER_MAXAGEMS = "maxAgeMs"; public static final String DEFAULT_VALUE_PUBLISHER_MAXAGEMS = "250"; - + public static final String PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "MessageSentThreadOccurance"; public static final String DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "50"; - private final ConfigurationFileRepresentation configuration; - - public GeneralConfig(ConfigurationFileRepresentation configuration) { - this.configuration = configuration; - this.configuration.addSection(SECTION_MARKER); - defaults(); - } - - public Boolean getEnabled() { - Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED); - return enabled; - } - - public String getHostPort() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT); - } - - public String getTransportType() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE); - } - - public String getTopic() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC); - } - - public String getTimeout() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT); - } - - public String getLimit() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT); - } - - public String getContenttype() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE); - } - - public String getMaxBatchSize() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE); - } - - public String getMaxAgeMs() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS); - } - - public String getMessageSentThreadOccurrence() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE); - } - - @Override - public String getSectionName() { - return SECTION_MARKER; - } - - @Override - public void defaults() { - // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT, DEFAULT_VALUE_PUBLISHER_HOST_PORT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC, DEFAULT_VALUE_PUBLISHER_TOPIC); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE, DEFAULT_VALUE_PUBLISHER_CONTENTTYPE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT, DEFAULT_VALUE_PUBLISHER_TIMEOUT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT, DEFAULT_VALUE_PUBLISHER_LIMIT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS, DEFAULT_VALUE_PUBLISHER_MAXAGEMS); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE); - } + private final ConfigurationFileRepresentation configuration; + + public GeneralConfig(ConfigurationFileRepresentation configuration) { + this.configuration = configuration; + this.configuration.addSection(SECTION_MARKER); + defaults(); + } + + public Boolean getEnabled() { + Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED); + return enabled; + } + + public String getHostPort() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT); + } + + public String getTransportType() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE); + } + + public String getTopic() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC); + } + + public String getTimeout() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT); + } + + public String getLimit() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT); + } + + public String getContenttype() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE); + } + + public String getMaxBatchSize() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE); + } + + public String getMaxAgeMs() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS); + } + + public String getMessageSentThreadOccurrence() { + return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE); + } + + @Override + public String getSectionName() { + return SECTION_MARKER; + } + + @Override + public void defaults() { + // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, + DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT, + DEFAULT_VALUE_PUBLISHER_HOST_PORT); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC, + DEFAULT_VALUE_PUBLISHER_TOPIC); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE, + DEFAULT_VALUE_PUBLISHER_CONTENTTYPE); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT, + DEFAULT_VALUE_PUBLISHER_TIMEOUT); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT, + DEFAULT_VALUE_PUBLISHER_LIMIT); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, + DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS, + DEFAULT_VALUE_PUBLISHER_MAXAGEMS); + configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, + DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE); + } } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java index 9d7ce090d..48cb76ead 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * ============LICENSE_START======================================================================== * ONAP : ccsdk feature sdnr wt * ================================================================================================= @@ -14,7 +14,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. * ============LICENSE_END========================================================================== - ******************************************************************************/ + */ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; @@ -38,8 +38,8 @@ public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectList NetconfNode netconfNode = accessor.getNetconfNode(); //, MountPoint mountpoint, DataBroker netconfNodeDataBroker; - LOG.debug("In onEnterConnected of MountpointNodeConnectListenerImpl - nNodeId = "+nNodeId.getValue()+ - " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().getValue()); + LOG.debug("In onEnterConnected of MountpointNodeConnectListenerImpl - nNodeId = " + nNodeId.getValue() + + " IP Address = " + netconfNode.getHost().getIpAddress().getIpv4Address().getValue()); JSONObject obj = new JSONObject(); obj.put("NodeId", nNodeId.getValue()); @@ -52,7 +52,7 @@ public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectList @Override public void onLeaveConnected(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) { - LOG.debug("In onLeaveConnected of MountpointNodeConnectListenerImpl - nNodeId = "+nNodeId); + LOG.debug("In onLeaveConnected of MountpointNodeConnectListenerImpl - nNodeId = " + nNodeId); JSONObject obj = new JSONObject(); obj.put("NodeId", nNodeId.getValue()); diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java index fb2651ee4..b7d76f38d 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/* * ============LICENSE_START======================================================================== * ONAP : ccsdk feature sdnr wt * ================================================================================================= @@ -14,7 +14,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. * ============LICENSE_END========================================================================== - ******************************************************************************/ + */ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; @@ -29,43 +29,44 @@ import org.slf4j.LoggerFactory; public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener { - private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class); - @Override - public void onCreated(NodeId nNodeId, NetconfNode netconfNode) { + private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class); - LOG.info("In onCreated of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId.getValue()+ - " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().toString()); - JSONObject obj = new JSONObject(); - obj.put("NodeId", nNodeId.getValue()); - obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); - obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); + @Override + public void onCreated(NodeId nNodeId, NetconfNode netconfNode) { - MountpointStatePublisher.stateObjects.add(obj); - } + LOG.info("In onCreated of MountpointNodeStateListenerImpl - nNodeId = " + nNodeId.getValue() + " IP Address = " + + netconfNode.getHost().getIpAddress().getIpv4Address().toString()); + JSONObject obj = new JSONObject(); + obj.put("NodeId", nNodeId.getValue()); + obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); + obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - @Override - public void onStateChange(NodeId nNodeId, NetconfNode netconfNode) { + MountpointStatePublisher.stateObjects.add(obj); + } - LOG.info("In onStateChange of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId.getValue()+ - " IP Address = "+netconfNode.getHost().getIpAddress().getIpv4Address().getValue()); - JSONObject obj = new JSONObject(); - obj.put("NodeId", nNodeId.getValue()); - obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); - obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); + @Override + public void onStateChange(NodeId nNodeId, NetconfNode netconfNode) { - MountpointStatePublisher.stateObjects.add(obj); - } + LOG.info("In onStateChange of MountpointNodeStateListenerImpl - nNodeId = " + nNodeId.getValue() + + " IP Address = " + netconfNode.getHost().getIpAddress().getIpv4Address().getValue()); + JSONObject obj = new JSONObject(); + obj.put("NodeId", nNodeId.getValue()); + obj.put("NetConfNodeState", netconfNode.getConnectionStatus().toString()); + obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); - @Override - public void onRemoved(NodeId nNodeId) { + MountpointStatePublisher.stateObjects.add(obj); + } - LOG.info("In onRemoved of MountpointNodeStateListenerImpl - nNodeId = "+nNodeId); - JSONObject obj = new JSONObject(); - obj.put("NodeId", nNodeId.getValue()); - obj.put("NetConfNodeState", "Removed"); - obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); + @Override + public void onRemoved(NodeId nNodeId) { - MountpointStatePublisher.stateObjects.add(obj); - } + LOG.info("In onRemoved of MountpointNodeStateListenerImpl - nNodeId = " + nNodeId); + JSONObject obj = new JSONObject(); + obj.put("NodeId", nNodeId.getValue()); + obj.put("NetConfNodeState", "Removed"); + obj.put("TimeStamp", java.time.Clock.systemUTC().instant()); + + MountpointStatePublisher.stateObjects.add(obj); + } } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java index 0223e7323..cb5cbe3e2 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java @@ -58,7 +58,8 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange public void init() { LOG.info("Init call for {}", APPLICATION_NAME); - ConfigurationFileRepresentation configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE); + ConfigurationFileRepresentation configFileRepresentation = + new ConfigurationFileRepresentation(CONFIGURATIONFILE); configFileRepresentation.registerConfigChangedListener(this); generalConfig = new GeneralConfig(configFileRepresentation); @@ -72,6 +73,7 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange /** * Reflect status for Unit Tests + * * @return Text with status */ public String isInitializationOk() { @@ -91,13 +93,13 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange netconfNodeStateService.registerNetconfNodeConnectListener(nodeConnectListener); netconfNodeStateService.registerNetconfNodeStateListener(nodeStateListener); } else if (dmaapEnabled && !dmaapEnabledNewVal) { - // DMaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s) + // DMaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s) LOG.info("DMaaP is disabled, stop publisher"); try { - MountpointStatePublisher.stopPublisher(); - } catch (IOException | InterruptedException e) { - LOG.error("Exception while stopping publisher ", e); - } + MountpointStatePublisher.stopPublisher(); + } catch (IOException | InterruptedException e) { + LOG.error("Exception while stopping publisher ", e); + } } dmaapEnabled = dmaapEnabledNewVal; } @@ -107,12 +109,12 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange LOG.info("{} closing ...", this.getClass().getName()); //close(updateService, configService, mwtnService); issue#1 try { - MountpointStatePublisher.stopPublisher(); - } catch (IOException | InterruptedException e) { - LOG.error("Exception while stopping publisher ", e); - } + MountpointStatePublisher.stopPublisher(); + } catch (IOException | InterruptedException e) { + LOG.error("Exception while stopping publisher ", e); + } //close(updateService, mwtnService); - LOG.info("{} closing done",APPLICATION_NAME); + LOG.info("{} closing done", APPLICATION_NAME); } /** diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java index 30857dec5..7f9fb2370 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java @@ -41,88 +41,89 @@ import org.slf4j.LoggerFactory; public class MountpointStatePublisher implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class); - public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>(); - static MRBatchingPublisher pub; - Properties publisherProperties = new Properties(); - static boolean closePublisher = false; //Set this to true in the "Close" method of MountpointStateProviderImpl - private int fetchPause = 5000; // Default pause between fetch - 5 seconds - - - public MountpointStatePublisher(Configuration config) { - initialize(config); - } - - public void initialize(Configuration config) { - LOG.info("In initializePublisher method of MountpointStatePublisher"); - GeneralConfig generalCfg = (GeneralConfig)config; - - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, generalCfg.getMessageSentThreadOccurrence()); - - createPublisher(publisherProperties); - } - - public MRBatchingPublisher createPublisher(Properties publisherProperties) { - - try { - pub = MRClientFactory.createBatchingPublisher(publisherProperties, false); - return pub; - } catch (IOException e) { - LOG.info("Exception while creating a publisher", e); - - } - return null; - } - - public void publishMessage(MRBatchingPublisher pub, String msg) { - LOG.info("Publishing message {} - ", msg); - try { - pub.send(msg); - } catch (IOException e) { - LOG.info("Exception while publishing a mesage ", e); - } - } - - public MRBatchingPublisher getPublisher() { - return pub; - } - - public void run() { - - while (!closePublisher) { - try { - if (stateObjects.size() > 0) { - JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst(); - publishMessage(getPublisher(), obj.toString()); - } else { - pauseThread(); - } - } catch(Exception ex) { - LOG.error("Exception while publishing message, ignoring and continuing ... ", ex); - } - - MRPublisherResponse res= pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called - LOG.debug("Response message = {} ",res.toString()); - } - } - - private void pauseThread() throws InterruptedException { - if (fetchPause > 0) { - LOG.debug("No data yet to publish. Pausing {} ms before retry ", fetchPause); - Thread.sleep(fetchPause); - } else { - LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately"); - } - } - - public static void stopPublisher() throws IOException, InterruptedException { - closePublisher = true; - pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close - } + private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class); + public static final List<JSONObject> stateObjects = new LinkedList<JSONObject>(); + static MRBatchingPublisher pub; + Properties publisherProperties = new Properties(); + static boolean closePublisher = false; //Set this to true in the "Close" method of MountpointStateProviderImpl + private int fetchPause = 5000; // Default pause between fetch - 5 seconds + + + public MountpointStatePublisher(Configuration config) { + initialize(config); + } + + public void initialize(Configuration config) { + LOG.info("In initializePublisher method of MountpointStatePublisher"); + GeneralConfig generalCfg = (GeneralConfig) config; + + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs()); + publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, + generalCfg.getMessageSentThreadOccurrence()); + + createPublisher(publisherProperties); + } + + public MRBatchingPublisher createPublisher(Properties publisherProperties) { + + try { + pub = MRClientFactory.createBatchingPublisher(publisherProperties, false); + return pub; + } catch (IOException e) { + LOG.info("Exception while creating a publisher", e); + + } + return null; + } + + public void publishMessage(MRBatchingPublisher pub, String msg) { + LOG.info("Publishing message {} - ", msg); + try { + pub.send(msg); + } catch (IOException e) { + LOG.info("Exception while publishing a mesage ", e); + } + } + + public MRBatchingPublisher getPublisher() { + return pub; + } + + public void run() { + + while (!closePublisher) { + try { + if (stateObjects.size() > 0) { + JSONObject obj = ((LinkedList<JSONObject>) stateObjects).removeFirst(); + publishMessage(getPublisher(), obj.toString()); + } else { + pauseThread(); + } + } catch (Exception ex) { + LOG.error("Exception while publishing message, ignoring and continuing ... ", ex); + } + + MRPublisherResponse res = pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called + LOG.debug("Response message = {} ", res.toString()); + } + } + + private void pauseThread() throws InterruptedException { + if (fetchPause > 0) { + LOG.debug("No data yet to publish. Pausing {} ms before retry ", fetchPause); + Thread.sleep(fetchPause); + } else { + LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately"); + } + } + + public static void stopPublisher() throws IOException, InterruptedException { + closePublisher = true; + pub.close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close + } } |