diff options
author | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2020-08-18 15:30:11 +0200 |
---|---|---|
committer | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2020-08-18 16:05:02 +0200 |
commit | 6e164bc0f1f6688714033e95731942a89f5f1868 (patch) | |
tree | 45e1192ca4edd763bab0587da8652a3cba4af2e9 /sdnr/wt/mountpoint-state-provider/provider/src/main | |
parent | e0ba85f7ef526aaa270f8a6cd5baad8e32eb920e (diff) |
Develop a VES Provider
Common VES provider will be used by devicemanager bundles and other bundles for sending VES messages
Issue-ID: SDNC-1188
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Change-Id: Ied23b82a528aac23d7bebab272a2f414e67d0866
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-state-provider/provider/src/main')
11 files changed, 547 insertions, 341 deletions
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java deleted file mode 100644 index 21ca9dae7..000000000 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * ============LICENSE_START======================================================================== - * ONAP : ccsdk feature sdnr wt - * ================================================================================================= - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - */ -package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; - -import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; - -/** - * Configuration of mountpoint-state-provider, general section<br> - * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not Generates default - * Configuration properties if none exist or exist partially Generates Publisher properties only for - * TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not - * generated by default. For a list of applicable properties for the different TranportType values, please see - - * https://wiki.onap.org/display/DW/Feature+configuration+requirements - */ -public class GeneralConfig implements Configuration { - - private static final String SECTION_MARKER = "general"; - - private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled"; //"disabled"; - - public static final String PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE = "TransportType"; - private static final String DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE = "HTTPNOAUTH"; - - public static final String PROPERTY_KEY_PUBLISHER_HOST_PORT = "host"; - private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmaap:3904"; - - public static final String PROPERTY_KEY_PUBLISHER_TOPIC = "topic"; - private static final String DEFAULT_VALUE_PUBLISHER_TOPIC = "unauthenticated.SDNR_MOUNTPOINT_STATE_INFO"; - - public static final String PROPERTY_KEY_PUBLISHER_CONTENTTYPE = "contenttype"; - private static final String DEFAULT_VALUE_PUBLISHER_CONTENTTYPE = "application/json"; - - public static final String PROPERTY_KEY_PUBLISHER_TIMEOUT = "timeout"; - private static final String DEFAULT_VALUE_PUBLISHER_TIMEOUT = "20000"; - - public static final String PROPERTY_KEY_PUBLISHER_LIMIT = "limit"; - private static final String DEFAULT_VALUE_PUBLISHER_LIMIT = "10000"; - - public static final String PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE = "maxBatchSize"; - public static final String DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE = "100"; - - public static final String PROPERTY_KEY_PUBLISHER_MAXAGEMS = "maxAgeMs"; - public static final String DEFAULT_VALUE_PUBLISHER_MAXAGEMS = "250"; - - public static final String PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "MessageSentThreadOccurance"; - public static final String DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "50"; - - private final ConfigurationFileRepresentation configuration; - - public GeneralConfig(ConfigurationFileRepresentation configuration) { - this.configuration = configuration; - this.configuration.addSection(SECTION_MARKER); - defaults(); - } - - public Boolean getEnabled() { - Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED); - return enabled; - } - - public String getHostPort() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT); - } - - public String getTransportType() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE); - } - - public String getTopic() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC); - } - - public String getTimeout() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT); - } - - public String getLimit() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT); - } - - public String getContenttype() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE); - } - - public String getMaxBatchSize() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE); - } - - public String getMaxAgeMs() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS); - } - - public String getMessageSentThreadOccurrence() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE); - } - - @Override - public String getSectionName() { - return SECTION_MARKER; - } - - @Override - public void defaults() { - // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, - DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT, - DEFAULT_VALUE_PUBLISHER_HOST_PORT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC, - DEFAULT_VALUE_PUBLISHER_TOPIC); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE, - DEFAULT_VALUE_PUBLISHER_CONTENTTYPE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT, - DEFAULT_VALUE_PUBLISHER_TIMEOUT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT, - DEFAULT_VALUE_PUBLISHER_LIMIT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, - DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS, - DEFAULT_VALUE_PUBLISHER_MAXAGEMS); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, - DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE); - } - -} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java index f9b7b1e6a..5cdf5abc6 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java @@ -34,14 +34,14 @@ import org.slf4j.LoggerFactory; public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeConnectListenerImpl.class); private NetconfNodeStateService netconfNodeStateService; - private MountpointStatePublisherMain mountpointStatePublisher; + private MountpointStatePublisher mountpointStatePublisher; private ListenerRegistration<MountpointNodeConnectListenerImpl> registeredNodeConnectListener; public MountpointNodeConnectListenerImpl(NetconfNodeStateService netconfNodeStateService) { this.netconfNodeStateService = netconfNodeStateService; } - public void start(MountpointStatePublisherMain mountpointStatePublisher) { + public void start(MountpointStatePublisher mountpointStatePublisher) { this.mountpointStatePublisher = mountpointStatePublisher; registeredNodeConnectListener = netconfNodeStateService.registerNetconfNodeConnectListener(this); } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java index bbfd87961..d8b5a85de 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java @@ -31,14 +31,14 @@ import org.slf4j.LoggerFactory; public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class); private NetconfNodeStateService netconfNodeStateService; - private MountpointStatePublisherMain mountpointStatePublisher; + private MountpointStatePublisher mountpointStatePublisher; private ListenerRegistration<MountpointNodeStateListenerImpl> registeredNodeStateListener; public MountpointNodeStateListenerImpl(NetconfNodeStateService netconfNodeStateService) { this.netconfNodeStateService = netconfNodeStateService; } - public void start(MountpointStatePublisherMain mountpointStatePublisher) { + public void start(MountpointStatePublisher mountpointStatePublisher) { this.mountpointStatePublisher = mountpointStatePublisher; registeredNodeStateListener = netconfNodeStateService.registerNetconfNodeStateListener(this); } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java index e31032393..6838bc3b2 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java @@ -24,27 +24,22 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; -import java.io.IOException; - -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MountpointStateProviderImpl implements AutoCloseable, IConfigChangedListener { +public class MountpointStateProviderImpl implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MountpointStateProviderImpl.class); private static final String APPLICATION_NAME = "mountpoint-state-provider"; - private static final String CONFIGURATIONFILE = "etc/mountpoint-state-provider.properties"; private NetconfNodeStateService netconfNodeStateService; - private GeneralConfig generalConfig; - private boolean dmaapEnabled = false; + private NetconfNetworkElementService netconfNetworkElementService; private MountpointNodeConnectListenerImpl nodeConnectListener; private MountpointNodeStateListenerImpl nodeStateListener; - private MountpointStatePublisherMain mountpointStatePublisher; + private MountpointStatePublisher mountpointStatePublisher; public MountpointStateProviderImpl() { LOG.info("Creating provider class for {}", APPLICATION_NAME); @@ -56,73 +51,41 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange this.netconfNodeStateService = netconfNodeStateService; } + public void setNetconfNetworkElementService(NetconfNetworkElementService netconfNetworkElementService) { + this.netconfNetworkElementService = netconfNetworkElementService; + } + public void init() { LOG.info("Init call for {}", APPLICATION_NAME); - ConfigurationFileRepresentation configFileRepresentation = - new ConfigurationFileRepresentation(CONFIGURATIONFILE); - configFileRepresentation.registerConfigChangedListener(this); nodeConnectListener = new MountpointNodeConnectListenerImpl(netconfNodeStateService); nodeStateListener = new MountpointNodeStateListenerImpl(netconfNodeStateService); - generalConfig = new GeneralConfig(configFileRepresentation); - if (generalConfig.getEnabled()) { //dmaapEnabled - startPublishing(); - } + startPublishing(); } /** * Reflect status for Unit Tests - * + * * @return Text with status */ public String isInitializationOk() { return "No implemented"; } - @Override - public void onConfigChanged() { - LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled()); - boolean dmaapEnabledNewVal = generalConfig.getEnabled(); - - // DMaap disabled earlier (or during bundle startup) but enabled later, start publisher(s) - if (!dmaapEnabled && dmaapEnabledNewVal) { - LOG.info("DMaaP is enabled, starting Publisher"); - startPublishing(); - } else if (dmaapEnabled && !dmaapEnabledNewVal) { - // DMaap enabled earlier (or during bundle startup) but disabled later, stop publisher(s) - LOG.info("DMaaP is disabled, stop publisher"); - stopPublishing(); - } - dmaapEnabled = dmaapEnabledNewVal; - } - public void startPublishing() { - mountpointStatePublisher = new MountpointStatePublisherMain(generalConfig); - mountpointStatePublisher.start(); + mountpointStatePublisher = new MountpointStatePublisher(netconfNetworkElementService.getServiceProvider().getVESCollectorService()); + Thread t = new Thread(mountpointStatePublisher); + t.start(); nodeConnectListener.start(mountpointStatePublisher); nodeStateListener.start(mountpointStatePublisher); } - public void stopPublishing() { - try { - nodeConnectListener.stop(); - nodeStateListener.stop(); - mountpointStatePublisher.stop(); - } catch (Exception e) { - LOG.error("Exception while stopping publisher ", e); - } - } - @Override public void close() throws Exception { LOG.info("{} closing ...", this.getClass().getName()); - try { - mountpointStatePublisher.stop(); - } catch (IOException | InterruptedException e) { - LOG.error("Exception while stopping publisher ", e); - } + mountpointStatePublisher.stop(); close(nodeConnectListener, nodeStateListener); LOG.info("{} closing done", APPLICATION_NAME); } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java new file mode 100644 index 000000000..9df37e305 --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Update Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import java.util.LinkedList; +import java.util.List; +import org.eclipse.jdt.annotation.NonNull; +import org.json.JSONObject; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorCfgService; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MountpointStatePublisher implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class); + private List<JSONObject> stateObjects = new LinkedList<JSONObject>(); + private boolean publish = true; + private int publishPause = 5000; // Default pause between fetch - 5 seconds + private VESCollectorService vesCollectorService; + + public MountpointStatePublisher(@NonNull VESCollectorService vesCollectorService) { + this.vesCollectorService = vesCollectorService; + } + + public void addToPublish(JSONObject publishObj) { + getStateObjects().add(publishObj); + } + + public synchronized List<JSONObject> getStateObjects() { + return stateObjects; + } + + public void stop() { + publish = false; + } + + private void pauseThread() throws InterruptedException { + if (publishPause > 0) { + LOG.debug("No data yet to publish. Pausing {} ms before retry ", publishPause); + Thread.sleep(publishPause); + } else { + LOG.debug("No data yet to publish. No publish pause specified - retrying immediately"); + } + } + + + public String createVESMessage(JSONObject msg, VESCollectorCfgService vesCfg) { + MountpointStateVESMessageFormatter vesFormatter = new MountpointStateVESMessageFormatter(vesCfg); + String vesMsg = vesFormatter.createVESMessage(msg); + return vesMsg; + } + + @Override + public void run() { + while (publish) { + try { + if (getStateObjects().size() > 0) { + JSONObject obj = ((LinkedList<JSONObject>) getStateObjects()).removeFirst(); + String vesMsg = createVESMessage(obj, vesCollectorService.getConfig()); + this.vesCollectorService.publishVESMessage(vesMsg); + } else { + pauseThread(); + } + } catch (Exception ex) { + LOG.error("Exception while publishing message, ignoring and continuing ... ", ex); + } + } + } +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java deleted file mode 100644 index 8d6e2d4a3..000000000 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP : ccsdk features - * ================================================================================ - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. - * All rights reserved. - * ================================================================================ - * Update Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================= - * - */ -package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import org.json.JSONObject; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; -import org.onap.dmaap.mr.client.MRBatchingPublisher; -import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.client.response.MRPublisherResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MountpointStatePublisherMain { - - private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisherMain.class); - private Thread thread = null; - private MRBatchingPublisher pub = null; - private List<JSONObject> stateObjects = new LinkedList<JSONObject>(); - private Properties publisherProperties = new Properties(); - private boolean closePublisher = false; - private int publishPause = 5000; // Default pause between fetch - 5 seconds - - public MountpointStatePublisherMain(Configuration config) { - initialize(config); - } - - public void initialize(Configuration config) { - LOG.info("In initializePublisher method of MountpointStatePublisher"); - GeneralConfig generalCfg = (GeneralConfig) config; - - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, - generalCfg.getMessageSentThreadOccurrence()); - - createPublisher(publisherProperties); - } - - public MRBatchingPublisher createPublisher(Properties publisherProperties) { - - try { - pub = MRClientFactory.createBatchingPublisher(publisherProperties, false); - return pub; - } catch (IOException e) { - LOG.info("Exception while creating a publisher", e); - } - return null; - } - - public void publishMessage(MRBatchingPublisher pub, String msg) { - LOG.info("Publishing message {} - ", msg); - try { - pub.send(msg); - } catch (IOException e) { - LOG.info("Exception while publishing a mesage ", e); - } - } - - public MRBatchingPublisher getPublisher() { - return pub; - } - - public void start() { - thread = new Thread(new MountpointStatePublisher()); - thread.start(); - } - - public void stop() throws IOException, InterruptedException { - closePublisher = true; - getPublisher().close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close) - } - - private void pauseThread() throws InterruptedException { - if (publishPause > 0) { - LOG.debug("No data yet to publish. Pausing {} ms before retry ", publishPause); - Thread.sleep(publishPause); - } else { - LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately"); - } - } - - public void addToPublish(JSONObject publishObj) { - getStateObjects().add(publishObj); - } - - public List<JSONObject> getStateObjects() { - return stateObjects; - } - - public class MountpointStatePublisher implements Runnable { - - @Override - public void run() { - while (!closePublisher) { - try { - if (getStateObjects().size() > 0) { - JSONObject obj = ((LinkedList<JSONObject>) getStateObjects()).removeFirst(); - publishMessage(getPublisher(), obj.toString()); - } else { - pauseThread(); - } - } catch (Exception ex) { - LOG.error("Exception while publishing message, ignoring and continuing ... ", ex); - } - - MRPublisherResponse res = pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called - LOG.debug("Response message = {} ", res.toString()); - } - - } - - } - -} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateVESMessageFormatter.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateVESMessageFormatter.java new file mode 100644 index 000000000..918438e55 --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateVESMessageFormatter.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Instant; +import org.json.JSONObject; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorCfgService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MountpointStateVESMessageFormatter { + private static final Logger LOG = LoggerFactory.getLogger(MountpointStateVESMessageFormatter.class); + private static final String VES_DOMAIN = "notification"; + private static final String VES_PRIORITY = "Normal"; + private static final String VES_CHANGETYPE = "ConnectionState"; + + private VESCollectorCfgService vesCfg; + static long sequenceNo = 0; + + public MountpointStateVESMessageFormatter(VESCollectorCfgService vesCfg) { + this.vesCfg = vesCfg; + } + + public String createVESMessage(JSONObject obj) { + LOG.debug("JSON Object to format to VES is - {}", obj.toString()); + String vesMsg = "{}"; + sequenceNo++; + + VESCommonEventHeaderPOJO vesCommonEventHeader = createVESCommonEventHeader(obj); + VESNotificationFieldsPOJO vesNotificationFields = createVESNotificationFields(obj); + + VESEvent vesEvent = new VESEvent(); + vesEvent.addEventObjects(vesCommonEventHeader); + vesEvent.addEventObjects(vesNotificationFields); + + try { + ObjectMapper objMapper = new ObjectMapper(); + vesMsg = objMapper.writeValueAsString(vesEvent); + LOG.debug("VES message to be published - {}", vesMsg); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + return vesMsg; + + } + + private VESNotificationFieldsPOJO createVESNotificationFields(JSONObject obj) { + VESNotificationFieldsPOJO vesNotificationFields = new VESNotificationFieldsPOJO(); + + vesNotificationFields.setChangeIdentifier(obj.getString("NodeId")); + vesNotificationFields.setChangeType(VES_CHANGETYPE); + vesNotificationFields.setNewState(obj.getString("NetConfNodeState")); + + return vesNotificationFields; + } + + private VESCommonEventHeaderPOJO createVESCommonEventHeader(JSONObject obj) { + VESCommonEventHeaderPOJO vesCommonEventHeader = new VESCommonEventHeaderPOJO(); + + vesCommonEventHeader.setDomain(VES_DOMAIN); + vesCommonEventHeader + .setEventId(obj.getString("NodeId") + "_" + obj.getString("NetConfNodeState") + "_" + sequenceNo); + vesCommonEventHeader + .setEventName(obj.getString("NodeId") + "_" + obj.getString("NetConfNodeState") + "_" + sequenceNo); + vesCommonEventHeader.setSourceName(obj.getString("NodeId")); + vesCommonEventHeader.setPriority(VES_PRIORITY); + vesCommonEventHeader.setReportingEntityName(this.vesCfg.getReportingEntityName()); + vesCommonEventHeader.setSequence(sequenceNo); + + Instant time = (Instant) obj.get("TimeStamp"); + vesCommonEventHeader.setLastEpochMicrosec(time.toEpochMilli() * 100); + vesCommonEventHeader.setStartEpochMicrosec(time.toEpochMilli() * 100); + + return vesCommonEventHeader; + } +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESCommonEventHeaderPOJO.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESCommonEventHeaderPOJO.java new file mode 100644 index 000000000..7b15ae152 --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESCommonEventHeaderPOJO.java @@ -0,0 +1,190 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + +@JsonPropertyOrder({"domain", "eventId", "eventName", "eventType", "lastEpochMicrosec", "nfcNamingCode", "nfNamingCode", "nfVendorName", "priority", "reportingEntityId", + "reportingEntityName", "sequence", "sourceId", "sourceName", "startEpochMicrosec", "timeZoneOffset", "version", "vesEventListenerVersion"}) +public class VESCommonEventHeaderPOJO { + + private String domain = ""; + private String eventId = ""; + private String eventName = ""; + private String eventType = ""; + private long sequence = 0L; + private String priority = ""; + @JsonIgnore + private String reportingEntityId = ""; + private String reportingEntityName = ""; + private String sourceId = ""; + private String sourceName = ""; + private long startEpochMicrosec = 0L; + private long lastEpochMicrosec = 0L; + private String nfcNamingCode = ""; + private String nfNamingCode = ""; + private String nfVendorName = ""; + private String timeZoneOffset = "+00:00"; + private String version = "4.1"; + private String vesEventListenerVersion = "7.1.1"; + + public String getDomain() { + return domain; + } + + public void setDomain(String domain) { + this.domain = domain; + } + + public String getEventId() { + return eventId; + } + + public void setEventId(String eventId) { + this.eventId = eventId; + } + + public String getEventName() { + return eventName; + } + + public void setEventName(String eventName) { + this.eventName = eventName; + } + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public Long getSequence() { + return sequence; + } + + public void setSequence(long sequenceNo) { + this.sequence = sequenceNo; + } + + public String getPriority() { + return priority; + } + + public void setPriority(String priority) { + this.priority = priority; + } + + public String getReportingEntityId() { + return reportingEntityId; + } + + public void setReportingEntityId(String reportingEntityId) { + this.reportingEntityId = reportingEntityId; + } + + public String getReportingEntityName() { + return reportingEntityName; + } + + public void setReportingEntityName(String reportingEntityName) { + this.reportingEntityName = reportingEntityName; + } + + public String getSourceId() { + return sourceId; + } + + public void setSourceId(String sourceId) { + this.sourceId = sourceId; + } + + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + public long getStartEpochMicrosec() { + return startEpochMicrosec; + } + + public void setStartEpochMicrosec(long startEpochMicrosec) { + this.startEpochMicrosec = startEpochMicrosec; + } + + public long getLastEpochMicrosec() { + return lastEpochMicrosec; + } + + public void setLastEpochMicrosec(long lastEpochMicrosec) { + this.lastEpochMicrosec = lastEpochMicrosec; + } + + public String getNfcNamingCode() { + return nfcNamingCode; + } + + public void setNfcNamingCode(String nfcNamingCode) { + this.nfcNamingCode = nfcNamingCode; + } + + public String getNfNamingCode() { + return nfNamingCode; + } + + public void setNfNamingCode(String nfNamingCode) { + this.nfNamingCode = nfNamingCode; + } + + public String getNfVendorName() { + return nfVendorName; + } + + public void setNfVendorName(String nfVendorName) { + this.nfVendorName = nfVendorName; + } + + public String getTimeZoneOffset() { + return timeZoneOffset; + } + + public void setTimeZoneOffset(String timeZoneOffset) { + this.timeZoneOffset = timeZoneOffset; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getVesEventListenerVersion() { + return vesEventListenerVersion; + } + + public void setVesEventListenerVersion(String vesEventListenerVersion) { + this.vesEventListenerVersion = vesEventListenerVersion; + } +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESEvent.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESEvent.java new file mode 100644 index 000000000..13017d46c --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESEvent.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import java.util.HashMap; +import java.util.Map; + +public class VESEvent { + public Map<String, Object> event = new HashMap<String, Object>(); + + public void addEventObjects(Object eventObject) { + if (eventObject instanceof VESCommonEventHeaderPOJO) + event.put("commonEventHeader", eventObject); + else if (eventObject instanceof VESNotificationFieldsPOJO) + event.put("notificationFields", eventObject); + + } + + public Map<String, Object> getEvent() { + return event; + } +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESNotificationFieldsPOJO.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESNotificationFieldsPOJO.java new file mode 100644 index 000000000..1963cd6ca --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESNotificationFieldsPOJO.java @@ -0,0 +1,108 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.ArrayList; +import java.util.HashMap; + +@JsonPropertyOrder({"arrayOfNamedHashMap", "changeContact", "changeIdentifier", "changeType", "newState", "oldState", "notificationFieldsVersion"}) +public class VESNotificationFieldsPOJO { + + private ArrayList<HashMap<String, Object>> arrayOfNamedHashMap = new ArrayList<HashMap<String, Object>>(); + @JsonIgnore + private HashMap<String, Object> namedHashMap = new HashMap<String, Object>(); + @JsonIgnore + private HashMap<String, String> hashMap = new HashMap<String, String>(); + @JsonIgnore + private String changeContact = ""; + private String changeIdentifier = ""; + private String changeType = ""; + //@JsonIgnore + private String newState = ""; + @JsonIgnore + private String oldState = ""; + @JsonIgnore + private String stateInterface = ""; + private String notificationFieldsVersion = "2.0"; + + public ArrayList<HashMap<String, Object>> getArrayOfNamedHashMap() { + return arrayOfNamedHashMap; + } + + public void setArrayOfNamedHashMap(ArrayList<HashMap<String, Object>> arrayOfNamedHashMap) { + this.arrayOfNamedHashMap = arrayOfNamedHashMap; + } + + public String getChangeContact() { + return changeContact; + } + + public void setChangeContact(String changeContact) { + this.changeContact = changeContact; + } + + public String getChangeIdentifier() { + return changeIdentifier; + } + + public void setChangeIdentifier(String changeIdentifier) { + this.changeIdentifier = changeIdentifier; + } + + public String getChangeType() { + return changeType; + } + + public void setChangeType(String changeType) { + this.changeType = changeType; + } + + public String getNewState() { + return newState; + } + + public void setNewState(String newState) { + this.newState = newState; + } + + public String getOldState() { + return oldState; + } + + public void setOldState(String oldState) { + this.oldState = oldState; + } + + public String getStateInterface() { + return stateInterface; + } + + public void setStateInterface(String stateInterface) { + this.stateInterface = stateInterface; + } + + public String getNotificationFieldsVersion() { + return notificationFieldsVersion; + } + + public void setNotificationFieldsVersion(String notificationFieldsVersion) { + this.notificationFieldsVersion = notificationFieldsVersion; + } +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml b/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml index 3ec9efbdd..5512359df 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml @@ -29,9 +29,14 @@ <reference id="netconfNodeStateService" availability="mandatory" activation="eager" interface="org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService"/> + + <reference id="netconfNetworkElementService" + availability="mandatory" activation="eager" + interface="org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService"/> <bean id="provider" class="org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStateProviderImpl" init-method="init" destroy-method="close"> <property name="netconfNodeStateService" ref="netconfNodeStateService"/> + <property name="netconfNetworkElementService" ref="netconfNetworkElementService"/> </bean> </blueprint> |