diff options
author | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2020-08-18 15:30:11 +0200 |
---|---|---|
committer | Ravi Pendurty <ravi.pendurty@highstreet-technologies.com> | 2020-08-18 16:05:02 +0200 |
commit | 6e164bc0f1f6688714033e95731942a89f5f1868 (patch) | |
tree | 45e1192ca4edd763bab0587da8652a3cba4af2e9 /sdnr/wt/mountpoint-state-provider | |
parent | e0ba85f7ef526aaa270f8a6cd5baad8e32eb920e (diff) |
Develop a VES Provider
Common VES provider will be used by devicemanager bundles and other bundles for sending VES messages
Issue-ID: SDNC-1188
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Change-Id: Ied23b82a528aac23d7bebab272a2f414e67d0866
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-state-provider')
17 files changed, 660 insertions, 711 deletions
diff --git a/sdnr/wt/mountpoint-state-provider/provider/pom.xml b/sdnr/wt/mountpoint-state-provider/provider/pom.xml index 294a33ae2..f1d221e29 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/pom.xml +++ b/sdnr/wt/mountpoint-state-provider/provider/pom.xml @@ -69,6 +69,12 @@ </dependency> <dependency> <groupId>org.onap.ccsdk.features.sdnr.wt</groupId> + <artifactId>sdnr-wt-devicemanager-model</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.onap.ccsdk.features.sdnr.wt</groupId> <artifactId>sdnr-wt-netconfnode-state-service-model</artifactId> <version>${project.version}</version> </dependency> diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java deleted file mode 100644 index 21ca9dae7..000000000 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * ============LICENSE_START======================================================================== - * ONAP : ccsdk feature sdnr wt - * ================================================================================================= - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - */ -package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; - -import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; - -/** - * Configuration of mountpoint-state-provider, general section<br> - * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not Generates default - * Configuration properties if none exist or exist partially Generates Publisher properties only for - * TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not - * generated by default. For a list of applicable properties for the different TranportType values, please see - - * https://wiki.onap.org/display/DW/Feature+configuration+requirements - */ -public class GeneralConfig implements Configuration { - - private static final String SECTION_MARKER = "general"; - - private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled"; //"disabled"; - - public static final String PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE = "TransportType"; - private static final String DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE = "HTTPNOAUTH"; - - public static final String PROPERTY_KEY_PUBLISHER_HOST_PORT = "host"; - private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmaap:3904"; - - public static final String PROPERTY_KEY_PUBLISHER_TOPIC = "topic"; - private static final String DEFAULT_VALUE_PUBLISHER_TOPIC = "unauthenticated.SDNR_MOUNTPOINT_STATE_INFO"; - - public static final String PROPERTY_KEY_PUBLISHER_CONTENTTYPE = "contenttype"; - private static final String DEFAULT_VALUE_PUBLISHER_CONTENTTYPE = "application/json"; - - public static final String PROPERTY_KEY_PUBLISHER_TIMEOUT = "timeout"; - private static final String DEFAULT_VALUE_PUBLISHER_TIMEOUT = "20000"; - - public static final String PROPERTY_KEY_PUBLISHER_LIMIT = "limit"; - private static final String DEFAULT_VALUE_PUBLISHER_LIMIT = "10000"; - - public static final String PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE = "maxBatchSize"; - public static final String DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE = "100"; - - public static final String PROPERTY_KEY_PUBLISHER_MAXAGEMS = "maxAgeMs"; - public static final String DEFAULT_VALUE_PUBLISHER_MAXAGEMS = "250"; - - public static final String PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "MessageSentThreadOccurance"; - public static final String DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "50"; - - private final ConfigurationFileRepresentation configuration; - - public GeneralConfig(ConfigurationFileRepresentation configuration) { - this.configuration = configuration; - this.configuration.addSection(SECTION_MARKER); - defaults(); - } - - public Boolean getEnabled() { - Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED); - return enabled; - } - - public String getHostPort() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT); - } - - public String getTransportType() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE); - } - - public String getTopic() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC); - } - - public String getTimeout() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT); - } - - public String getLimit() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT); - } - - public String getContenttype() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE); - } - - public String getMaxBatchSize() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE); - } - - public String getMaxAgeMs() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS); - } - - public String getMessageSentThreadOccurrence() { - return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE); - } - - @Override - public String getSectionName() { - return SECTION_MARKER; - } - - @Override - public void defaults() { - // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, - DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT, - DEFAULT_VALUE_PUBLISHER_HOST_PORT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC, - DEFAULT_VALUE_PUBLISHER_TOPIC); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE, - DEFAULT_VALUE_PUBLISHER_CONTENTTYPE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT, - DEFAULT_VALUE_PUBLISHER_TIMEOUT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT, - DEFAULT_VALUE_PUBLISHER_LIMIT); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, - DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS, - DEFAULT_VALUE_PUBLISHER_MAXAGEMS); - configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, - DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE); - } - -} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java index f9b7b1e6a..5cdf5abc6 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java @@ -34,14 +34,14 @@ import org.slf4j.LoggerFactory; public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeConnectListenerImpl.class); private NetconfNodeStateService netconfNodeStateService; - private MountpointStatePublisherMain mountpointStatePublisher; + private MountpointStatePublisher mountpointStatePublisher; private ListenerRegistration<MountpointNodeConnectListenerImpl> registeredNodeConnectListener; public MountpointNodeConnectListenerImpl(NetconfNodeStateService netconfNodeStateService) { this.netconfNodeStateService = netconfNodeStateService; } - public void start(MountpointStatePublisherMain mountpointStatePublisher) { + public void start(MountpointStatePublisher mountpointStatePublisher) { this.mountpointStatePublisher = mountpointStatePublisher; registeredNodeConnectListener = netconfNodeStateService.registerNetconfNodeConnectListener(this); } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java index bbfd87961..d8b5a85de 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java @@ -31,14 +31,14 @@ import org.slf4j.LoggerFactory; public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class); private NetconfNodeStateService netconfNodeStateService; - private MountpointStatePublisherMain mountpointStatePublisher; + private MountpointStatePublisher mountpointStatePublisher; private ListenerRegistration<MountpointNodeStateListenerImpl> registeredNodeStateListener; public MountpointNodeStateListenerImpl(NetconfNodeStateService netconfNodeStateService) { this.netconfNodeStateService = netconfNodeStateService; } - public void start(MountpointStatePublisherMain mountpointStatePublisher) { + public void start(MountpointStatePublisher mountpointStatePublisher) { this.mountpointStatePublisher = mountpointStatePublisher; registeredNodeStateListener = netconfNodeStateService.registerNetconfNodeStateListener(this); } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java index e31032393..6838bc3b2 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java @@ -24,27 +24,22 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; -import java.io.IOException; - -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService; import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MountpointStateProviderImpl implements AutoCloseable, IConfigChangedListener { +public class MountpointStateProviderImpl implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(MountpointStateProviderImpl.class); private static final String APPLICATION_NAME = "mountpoint-state-provider"; - private static final String CONFIGURATIONFILE = "etc/mountpoint-state-provider.properties"; private NetconfNodeStateService netconfNodeStateService; - private GeneralConfig generalConfig; - private boolean dmaapEnabled = false; + private NetconfNetworkElementService netconfNetworkElementService; private MountpointNodeConnectListenerImpl nodeConnectListener; private MountpointNodeStateListenerImpl nodeStateListener; - private MountpointStatePublisherMain mountpointStatePublisher; + private MountpointStatePublisher mountpointStatePublisher; public MountpointStateProviderImpl() { LOG.info("Creating provider class for {}", APPLICATION_NAME); @@ -56,73 +51,41 @@ public class MountpointStateProviderImpl implements AutoCloseable, IConfigChange this.netconfNodeStateService = netconfNodeStateService; } + public void setNetconfNetworkElementService(NetconfNetworkElementService netconfNetworkElementService) { + this.netconfNetworkElementService = netconfNetworkElementService; + } + public void init() { LOG.info("Init call for {}", APPLICATION_NAME); - ConfigurationFileRepresentation configFileRepresentation = - new ConfigurationFileRepresentation(CONFIGURATIONFILE); - configFileRepresentation.registerConfigChangedListener(this); nodeConnectListener = new MountpointNodeConnectListenerImpl(netconfNodeStateService); nodeStateListener = new MountpointNodeStateListenerImpl(netconfNodeStateService); - generalConfig = new GeneralConfig(configFileRepresentation); - if (generalConfig.getEnabled()) { //dmaapEnabled - startPublishing(); - } + startPublishing(); } /** * Reflect status for Unit Tests - * + * * @return Text with status */ public String isInitializationOk() { return "No implemented"; } - @Override - public void onConfigChanged() { - LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled()); - boolean dmaapEnabledNewVal = generalConfig.getEnabled(); - - // DMaap disabled earlier (or during bundle startup) but enabled later, start publisher(s) - if (!dmaapEnabled && dmaapEnabledNewVal) { - LOG.info("DMaaP is enabled, starting Publisher"); - startPublishing(); - } else if (dmaapEnabled && !dmaapEnabledNewVal) { - // DMaap enabled earlier (or during bundle startup) but disabled later, stop publisher(s) - LOG.info("DMaaP is disabled, stop publisher"); - stopPublishing(); - } - dmaapEnabled = dmaapEnabledNewVal; - } - public void startPublishing() { - mountpointStatePublisher = new MountpointStatePublisherMain(generalConfig); - mountpointStatePublisher.start(); + mountpointStatePublisher = new MountpointStatePublisher(netconfNetworkElementService.getServiceProvider().getVESCollectorService()); + Thread t = new Thread(mountpointStatePublisher); + t.start(); nodeConnectListener.start(mountpointStatePublisher); nodeStateListener.start(mountpointStatePublisher); } - public void stopPublishing() { - try { - nodeConnectListener.stop(); - nodeStateListener.stop(); - mountpointStatePublisher.stop(); - } catch (Exception e) { - LOG.error("Exception while stopping publisher ", e); - } - } - @Override public void close() throws Exception { LOG.info("{} closing ...", this.getClass().getName()); - try { - mountpointStatePublisher.stop(); - } catch (IOException | InterruptedException e) { - LOG.error("Exception while stopping publisher ", e); - } + mountpointStatePublisher.stop(); close(nodeConnectListener, nodeStateListener); LOG.info("{} closing done", APPLICATION_NAME); } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java new file mode 100644 index 000000000..9df37e305 --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Update Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import java.util.LinkedList; +import java.util.List; +import org.eclipse.jdt.annotation.NonNull; +import org.json.JSONObject; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorCfgService; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MountpointStatePublisher implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class); + private List<JSONObject> stateObjects = new LinkedList<JSONObject>(); + private boolean publish = true; + private int publishPause = 5000; // Default pause between fetch - 5 seconds + private VESCollectorService vesCollectorService; + + public MountpointStatePublisher(@NonNull VESCollectorService vesCollectorService) { + this.vesCollectorService = vesCollectorService; + } + + public void addToPublish(JSONObject publishObj) { + getStateObjects().add(publishObj); + } + + public synchronized List<JSONObject> getStateObjects() { + return stateObjects; + } + + public void stop() { + publish = false; + } + + private void pauseThread() throws InterruptedException { + if (publishPause > 0) { + LOG.debug("No data yet to publish. Pausing {} ms before retry ", publishPause); + Thread.sleep(publishPause); + } else { + LOG.debug("No data yet to publish. No publish pause specified - retrying immediately"); + } + } + + + public String createVESMessage(JSONObject msg, VESCollectorCfgService vesCfg) { + MountpointStateVESMessageFormatter vesFormatter = new MountpointStateVESMessageFormatter(vesCfg); + String vesMsg = vesFormatter.createVESMessage(msg); + return vesMsg; + } + + @Override + public void run() { + while (publish) { + try { + if (getStateObjects().size() > 0) { + JSONObject obj = ((LinkedList<JSONObject>) getStateObjects()).removeFirst(); + String vesMsg = createVESMessage(obj, vesCollectorService.getConfig()); + this.vesCollectorService.publishVESMessage(vesMsg); + } else { + pauseThread(); + } + } catch (Exception ex) { + LOG.error("Exception while publishing message, ignoring and continuing ... ", ex); + } + } + } +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java deleted file mode 100644 index 8d6e2d4a3..000000000 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP : ccsdk features - * ================================================================================ - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. - * All rights reserved. - * ================================================================================ - * Update Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================= - * - */ -package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import org.json.JSONObject; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; -import org.onap.dmaap.mr.client.MRBatchingPublisher; -import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.client.response.MRPublisherResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MountpointStatePublisherMain { - - private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisherMain.class); - private Thread thread = null; - private MRBatchingPublisher pub = null; - private List<JSONObject> stateObjects = new LinkedList<JSONObject>(); - private Properties publisherProperties = new Properties(); - private boolean closePublisher = false; - private int publishPause = 5000; // Default pause between fetch - 5 seconds - - public MountpointStatePublisherMain(Configuration config) { - initialize(config); - } - - public void initialize(Configuration config) { - LOG.info("In initializePublisher method of MountpointStatePublisher"); - GeneralConfig generalCfg = (GeneralConfig) config; - - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs()); - publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE, - generalCfg.getMessageSentThreadOccurrence()); - - createPublisher(publisherProperties); - } - - public MRBatchingPublisher createPublisher(Properties publisherProperties) { - - try { - pub = MRClientFactory.createBatchingPublisher(publisherProperties, false); - return pub; - } catch (IOException e) { - LOG.info("Exception while creating a publisher", e); - } - return null; - } - - public void publishMessage(MRBatchingPublisher pub, String msg) { - LOG.info("Publishing message {} - ", msg); - try { - pub.send(msg); - } catch (IOException e) { - LOG.info("Exception while publishing a mesage ", e); - } - } - - public MRBatchingPublisher getPublisher() { - return pub; - } - - public void start() { - thread = new Thread(new MountpointStatePublisher()); - thread.start(); - } - - public void stop() throws IOException, InterruptedException { - closePublisher = true; - getPublisher().close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close) - } - - private void pauseThread() throws InterruptedException { - if (publishPause > 0) { - LOG.debug("No data yet to publish. Pausing {} ms before retry ", publishPause); - Thread.sleep(publishPause); - } else { - LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately"); - } - } - - public void addToPublish(JSONObject publishObj) { - getStateObjects().add(publishObj); - } - - public List<JSONObject> getStateObjects() { - return stateObjects; - } - - public class MountpointStatePublisher implements Runnable { - - @Override - public void run() { - while (!closePublisher) { - try { - if (getStateObjects().size() > 0) { - JSONObject obj = ((LinkedList<JSONObject>) getStateObjects()).removeFirst(); - publishMessage(getPublisher(), obj.toString()); - } else { - pauseThread(); - } - } catch (Exception ex) { - LOG.error("Exception while publishing message, ignoring and continuing ... ", ex); - } - - MRPublisherResponse res = pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called - LOG.debug("Response message = {} ", res.toString()); - } - - } - - } - -} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateVESMessageFormatter.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateVESMessageFormatter.java new file mode 100644 index 000000000..918438e55 --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateVESMessageFormatter.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : ccsdk features + * ================================================================================ + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Instant; +import org.json.JSONObject; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorCfgService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MountpointStateVESMessageFormatter { + private static final Logger LOG = LoggerFactory.getLogger(MountpointStateVESMessageFormatter.class); + private static final String VES_DOMAIN = "notification"; + private static final String VES_PRIORITY = "Normal"; + private static final String VES_CHANGETYPE = "ConnectionState"; + + private VESCollectorCfgService vesCfg; + static long sequenceNo = 0; + + public MountpointStateVESMessageFormatter(VESCollectorCfgService vesCfg) { + this.vesCfg = vesCfg; + } + + public String createVESMessage(JSONObject obj) { + LOG.debug("JSON Object to format to VES is - {}", obj.toString()); + String vesMsg = "{}"; + sequenceNo++; + + VESCommonEventHeaderPOJO vesCommonEventHeader = createVESCommonEventHeader(obj); + VESNotificationFieldsPOJO vesNotificationFields = createVESNotificationFields(obj); + + VESEvent vesEvent = new VESEvent(); + vesEvent.addEventObjects(vesCommonEventHeader); + vesEvent.addEventObjects(vesNotificationFields); + + try { + ObjectMapper objMapper = new ObjectMapper(); + vesMsg = objMapper.writeValueAsString(vesEvent); + LOG.debug("VES message to be published - {}", vesMsg); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + return vesMsg; + + } + + private VESNotificationFieldsPOJO createVESNotificationFields(JSONObject obj) { + VESNotificationFieldsPOJO vesNotificationFields = new VESNotificationFieldsPOJO(); + + vesNotificationFields.setChangeIdentifier(obj.getString("NodeId")); + vesNotificationFields.setChangeType(VES_CHANGETYPE); + vesNotificationFields.setNewState(obj.getString("NetConfNodeState")); + + return vesNotificationFields; + } + + private VESCommonEventHeaderPOJO createVESCommonEventHeader(JSONObject obj) { + VESCommonEventHeaderPOJO vesCommonEventHeader = new VESCommonEventHeaderPOJO(); + + vesCommonEventHeader.setDomain(VES_DOMAIN); + vesCommonEventHeader + .setEventId(obj.getString("NodeId") + "_" + obj.getString("NetConfNodeState") + "_" + sequenceNo); + vesCommonEventHeader + .setEventName(obj.getString("NodeId") + "_" + obj.getString("NetConfNodeState") + "_" + sequenceNo); + vesCommonEventHeader.setSourceName(obj.getString("NodeId")); + vesCommonEventHeader.setPriority(VES_PRIORITY); + vesCommonEventHeader.setReportingEntityName(this.vesCfg.getReportingEntityName()); + vesCommonEventHeader.setSequence(sequenceNo); + + Instant time = (Instant) obj.get("TimeStamp"); + vesCommonEventHeader.setLastEpochMicrosec(time.toEpochMilli() * 100); + vesCommonEventHeader.setStartEpochMicrosec(time.toEpochMilli() * 100); + + return vesCommonEventHeader; + } +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESCommonEventHeaderPOJO.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESCommonEventHeaderPOJO.java new file mode 100644 index 000000000..7b15ae152 --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESCommonEventHeaderPOJO.java @@ -0,0 +1,190 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + +@JsonPropertyOrder({"domain", "eventId", "eventName", "eventType", "lastEpochMicrosec", "nfcNamingCode", "nfNamingCode", "nfVendorName", "priority", "reportingEntityId", + "reportingEntityName", "sequence", "sourceId", "sourceName", "startEpochMicrosec", "timeZoneOffset", "version", "vesEventListenerVersion"}) +public class VESCommonEventHeaderPOJO { + + private String domain = ""; + private String eventId = ""; + private String eventName = ""; + private String eventType = ""; + private long sequence = 0L; + private String priority = ""; + @JsonIgnore + private String reportingEntityId = ""; + private String reportingEntityName = ""; + private String sourceId = ""; + private String sourceName = ""; + private long startEpochMicrosec = 0L; + private long lastEpochMicrosec = 0L; + private String nfcNamingCode = ""; + private String nfNamingCode = ""; + private String nfVendorName = ""; + private String timeZoneOffset = "+00:00"; + private String version = "4.1"; + private String vesEventListenerVersion = "7.1.1"; + + public String getDomain() { + return domain; + } + + public void setDomain(String domain) { + this.domain = domain; + } + + public String getEventId() { + return eventId; + } + + public void setEventId(String eventId) { + this.eventId = eventId; + } + + public String getEventName() { + return eventName; + } + + public void setEventName(String eventName) { + this.eventName = eventName; + } + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public Long getSequence() { + return sequence; + } + + public void setSequence(long sequenceNo) { + this.sequence = sequenceNo; + } + + public String getPriority() { + return priority; + } + + public void setPriority(String priority) { + this.priority = priority; + } + + public String getReportingEntityId() { + return reportingEntityId; + } + + public void setReportingEntityId(String reportingEntityId) { + this.reportingEntityId = reportingEntityId; + } + + public String getReportingEntityName() { + return reportingEntityName; + } + + public void setReportingEntityName(String reportingEntityName) { + this.reportingEntityName = reportingEntityName; + } + + public String getSourceId() { + return sourceId; + } + + public void setSourceId(String sourceId) { + this.sourceId = sourceId; + } + + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + public long getStartEpochMicrosec() { + return startEpochMicrosec; + } + + public void setStartEpochMicrosec(long startEpochMicrosec) { + this.startEpochMicrosec = startEpochMicrosec; + } + + public long getLastEpochMicrosec() { + return lastEpochMicrosec; + } + + public void setLastEpochMicrosec(long lastEpochMicrosec) { + this.lastEpochMicrosec = lastEpochMicrosec; + } + + public String getNfcNamingCode() { + return nfcNamingCode; + } + + public void setNfcNamingCode(String nfcNamingCode) { + this.nfcNamingCode = nfcNamingCode; + } + + public String getNfNamingCode() { + return nfNamingCode; + } + + public void setNfNamingCode(String nfNamingCode) { + this.nfNamingCode = nfNamingCode; + } + + public String getNfVendorName() { + return nfVendorName; + } + + public void setNfVendorName(String nfVendorName) { + this.nfVendorName = nfVendorName; + } + + public String getTimeZoneOffset() { + return timeZoneOffset; + } + + public void setTimeZoneOffset(String timeZoneOffset) { + this.timeZoneOffset = timeZoneOffset; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getVesEventListenerVersion() { + return vesEventListenerVersion; + } + + public void setVesEventListenerVersion(String vesEventListenerVersion) { + this.vesEventListenerVersion = vesEventListenerVersion; + } +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESEvent.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESEvent.java new file mode 100644 index 000000000..13017d46c --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESEvent.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import java.util.HashMap; +import java.util.Map; + +public class VESEvent { + public Map<String, Object> event = new HashMap<String, Object>(); + + public void addEventObjects(Object eventObject) { + if (eventObject instanceof VESCommonEventHeaderPOJO) + event.put("commonEventHeader", eventObject); + else if (eventObject instanceof VESNotificationFieldsPOJO) + event.put("notificationFields", eventObject); + + } + + public Map<String, Object> getEvent() { + return event; + } +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESNotificationFieldsPOJO.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESNotificationFieldsPOJO.java new file mode 100644 index 000000000..1963cd6ca --- /dev/null +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESNotificationFieldsPOJO.java @@ -0,0 +1,108 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt + * ================================================================================================= + * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ +package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import java.util.ArrayList; +import java.util.HashMap; + +@JsonPropertyOrder({"arrayOfNamedHashMap", "changeContact", "changeIdentifier", "changeType", "newState", "oldState", "notificationFieldsVersion"}) +public class VESNotificationFieldsPOJO { + + private ArrayList<HashMap<String, Object>> arrayOfNamedHashMap = new ArrayList<HashMap<String, Object>>(); + @JsonIgnore + private HashMap<String, Object> namedHashMap = new HashMap<String, Object>(); + @JsonIgnore + private HashMap<String, String> hashMap = new HashMap<String, String>(); + @JsonIgnore + private String changeContact = ""; + private String changeIdentifier = ""; + private String changeType = ""; + //@JsonIgnore + private String newState = ""; + @JsonIgnore + private String oldState = ""; + @JsonIgnore + private String stateInterface = ""; + private String notificationFieldsVersion = "2.0"; + + public ArrayList<HashMap<String, Object>> getArrayOfNamedHashMap() { + return arrayOfNamedHashMap; + } + + public void setArrayOfNamedHashMap(ArrayList<HashMap<String, Object>> arrayOfNamedHashMap) { + this.arrayOfNamedHashMap = arrayOfNamedHashMap; + } + + public String getChangeContact() { + return changeContact; + } + + public void setChangeContact(String changeContact) { + this.changeContact = changeContact; + } + + public String getChangeIdentifier() { + return changeIdentifier; + } + + public void setChangeIdentifier(String changeIdentifier) { + this.changeIdentifier = changeIdentifier; + } + + public String getChangeType() { + return changeType; + } + + public void setChangeType(String changeType) { + this.changeType = changeType; + } + + public String getNewState() { + return newState; + } + + public void setNewState(String newState) { + this.newState = newState; + } + + public String getOldState() { + return oldState; + } + + public void setOldState(String oldState) { + this.oldState = oldState; + } + + public String getStateInterface() { + return stateInterface; + } + + public void setStateInterface(String stateInterface) { + this.stateInterface = stateInterface; + } + + public String getNotificationFieldsVersion() { + return notificationFieldsVersion; + } + + public void setNotificationFieldsVersion(String notificationFieldsVersion) { + this.notificationFieldsVersion = notificationFieldsVersion; + } +} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml b/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml index 3ec9efbdd..5512359df 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml +++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml @@ -29,9 +29,14 @@ <reference id="netconfNodeStateService" availability="mandatory" activation="eager" interface="org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService"/> + + <reference id="netconfNetworkElementService" + availability="mandatory" activation="eager" + interface="org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService"/> <bean id="provider" class="org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStateProviderImpl" init-method="init" destroy-method="close"> <property name="netconfNodeStateService" ref="netconfNodeStateService"/> + <property name="netconfNetworkElementService" ref="netconfNetworkElementService"/> </bean> </blueprint> diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/GeneralConfigTest.java b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/GeneralConfigTest.java deleted file mode 100644 index c921e7bcb..000000000 --- a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/GeneralConfigTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * ============LICENSE_START======================================================================== - * ONAP : ccsdk feature sdnr wt - * ================================================================================================= - * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - */ - -package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test; - -import static org.junit.Assert.assertEquals; -import com.google.common.io.Files; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import org.junit.After; -import org.junit.Test; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; -import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.GeneralConfig; - -public class GeneralConfigTest { - - // @formatter:off - private static final String TESTCONFIG_CONTENT = - "[general]\n" - + "dmaapEnabled=false\n" - + "TransportType=HTTPNOAUTH\n" - + "host=onap-dmap:3904\n" - + "topic=unauthenticated.SDNR_MOUNTPOINT_STATE_INFO\n" - + "contenttype=application/json\n" - + "timeout=20000\n" - + "limit=10000\n" - + "maxBatchSize=100\n" - + "maxAgeMs=250\n" - + "MessageSentThreadOccurance=50\n"; - // @formatter:on - private final String fileName = "test.properties"; - private ConfigurationFileRepresentation globalCfg; - - @Test - public void test() throws IOException { - - Files.asCharSink(new File(fileName), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT); - globalCfg = new ConfigurationFileRepresentation(fileName); - GeneralConfig cfg = new GeneralConfig(globalCfg); - assertEquals("onap-dmap:3904", cfg.getHostPort()); - assertEquals(false, cfg.getEnabled()); - assertEquals("unauthenticated.SDNR_MOUNTPOINT_STATE_INFO", cfg.getTopic()); - assertEquals("application/json", cfg.getContenttype()); - assertEquals("20000", cfg.getTimeout()); - assertEquals("10000", cfg.getLimit()); - assertEquals("100", cfg.getMaxBatchSize()); - assertEquals("250", cfg.getMaxAgeMs()); - assertEquals("50", cfg.getMessageSentThreadOccurrence()); - assertEquals("HTTPNOAUTH", cfg.getTransportType()); - assertEquals("general", cfg.getSectionName()); - - - } - - @After - public void cleanUp() { - File file = new File(fileName); - if (file.exists()) { - System.out.println("File exists, Deleting it"); - file.delete(); - } - - } -} diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeConnectListenerImpl.java index 2466683fd..237e94989 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeConnectListenerImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeConnectListenerImpl.java @@ -18,19 +18,19 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import com.google.common.io.Files; -import java.io.File; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.Optional; -import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; -import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.DeviceManagerServiceProvider; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService; import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointNodeConnectListenerImpl; -import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisherMain; +import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisher; import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test.mock.NetconfAccessorMock; import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test.mock.NetconfNodeMock; import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test.mock.NetconfNodeStateServiceMock; @@ -40,39 +40,32 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. public class TestMountpointNodeConnectListenerImpl { - // @formatter:off - private static final String TESTCONFIG_CONTENT = - "[general]\n" - + "dmaapEnabled=false\n" - + "TransportType=HTTPNOAUTH\n" - + "host=onap-dmap:3904\n" - + "topic=unauthenticated.SDNR_MOUNTPOINT_STATE_INFO\n" - + "contenttype=application/json\n" - + "timeout=20000\n" - + "limit=10000\n" - + "maxBatchSize=100\n" - + "maxAgeMs=250\n" - + "MessageSentThreadOccurance=50\n"; - // @formatter:on - private final String fileName = "test1.properties"; - private ConfigurationFileRepresentation globalCfg; - - - NetconfNodeStateServiceMock netconfNodeStateServiceMock = new NetconfNodeStateServiceMock(); - MountpointNodeConnectListenerImpl nodeConnectListener = - new MountpointNodeConnectListenerImpl(netconfNodeStateServiceMock); - MountpointStatePublisherMain mountpointStatePublisher; - NetconfNodeMock netconfNodeMock = new NetconfNodeMock(); - NetconfNode netconfNode = netconfNodeMock.getNetconfNode(); - NodeId nNodeId = new NodeId("nSky"); - NetconfAccessor accessor = new NetconfAccessorMock(nNodeId, netconfNode); + DeviceManagerServiceProvider serviceProvider; + MountpointStatePublisher mountpointStatePublisher; + NetconfNodeStateServiceMock netconfNodeStateServiceMock; + MountpointNodeConnectListenerImpl nodeConnectListener; + NetconfNodeMock netconfNodeMock; + NetconfNode netconfNode; + NodeId nNodeId; + NetconfAccessor accessor; + VESCollectorService vesCollectorService; @Before public void initialize() throws IOException { - Files.asCharSink(new File(fileName), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT); - globalCfg = new ConfigurationFileRepresentation(fileName); - GeneralConfig cfg = new GeneralConfig(globalCfg); - mountpointStatePublisher = new MountpointStatePublisherMain(cfg); + serviceProvider = mock(DeviceManagerServiceProvider.class); + netconfNodeStateServiceMock = new NetconfNodeStateServiceMock(); + netconfNodeMock = new NetconfNodeMock(); + netconfNode = netconfNodeMock.getNetconfNode(); + vesCollectorService = mock(VESCollectorService.class); + NetconfNetworkElementService netconfNetworkElementService = mock(NetconfNetworkElementService.class); + nNodeId = new NodeId("nSky"); + accessor = new NetconfAccessorMock(nNodeId, netconfNode); + + mountpointStatePublisher = new MountpointStatePublisher(vesCollectorService); + when(netconfNetworkElementService.getServiceProvider()).thenReturn(serviceProvider); + when(serviceProvider.getVESCollectorService()).thenReturn(vesCollectorService); + + nodeConnectListener = new MountpointNodeConnectListenerImpl(netconfNodeStateServiceMock); nodeConnectListener.start(mountpointStatePublisher); } @@ -90,18 +83,8 @@ public class TestMountpointNodeConnectListenerImpl { @Test public void testClose() throws Exception { - //assertEquals(MountpointStatePublisher.stateObjects.size(), 0); + assertEquals(mountpointStatePublisher.getStateObjects().size(), 0); nodeConnectListener.close(); } - @After - public void after() { - File file = new File(fileName); - if (file.exists()) { - System.out.println("File exists, Deleting it"); - file.delete(); - } - - } - } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeStateListenerImpl.java index 8d95a4c53..c6a9d11e5 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeStateListenerImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeStateListenerImpl.java @@ -21,17 +21,15 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; -import com.google.common.io.Files; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import org.junit.After; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.junit.Before; import org.junit.Test; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; -import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.DeviceManagerServiceProvider; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService; import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointNodeStateListenerImpl; -import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisherMain; +import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisher; import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test.mock.NetconfNodeMock; import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test.mock.NetconfNodeStateServiceMock; import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode; @@ -39,37 +37,23 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. public class TestMountpointNodeStateListenerImpl { - // @formatter:off - private static final String TESTCONFIG_CONTENT = - "[general]\n" - + "dmaapEnabled=false\n" - + "TransportType=HTTPNOAUTH\n" - + "host=onap-dmap:3904\n" - + "topic=unauthenticated.SDNR_MOUNTPOINT_STATE_INFO\n" - + "contenttype=application/json\n" - + "timeout=20000\n" - + "limit=10000\n" - + "maxBatchSize=100\n" - + "maxAgeMs=250\n" - + "MessageSentThreadOccurance=50\n"; - // @formatter:on - private final String fileName = "test2.properties"; - private ConfigurationFileRepresentation globalCfg; - NetconfNodeStateServiceMock netconfNodeStateServiceMock = new NetconfNodeStateServiceMock(); MountpointNodeStateListenerImpl nodeStateListener = new MountpointNodeStateListenerImpl(netconfNodeStateServiceMock); - MountpointStatePublisherMain mountpointStatePublisher; + MountpointStatePublisher mountpointStatePublisher; NetconfNodeMock netconfNodeMock = new NetconfNodeMock(); NetconfNode netconfNode = netconfNodeMock.getNetconfNode(); NodeId nNodeId = new NodeId("nSky"); + VESCollectorService vesCollectorService; @Before - public void initialize() throws IOException { - Files.asCharSink(new File(fileName), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT); - globalCfg = new ConfigurationFileRepresentation(fileName); - GeneralConfig cfg = new GeneralConfig(globalCfg); - mountpointStatePublisher = new MountpointStatePublisherMain(cfg); + public void initialize() { + DeviceManagerServiceProvider serviceProvider = mock(DeviceManagerServiceProvider.class); + vesCollectorService = mock(VESCollectorService.class); + NetconfNetworkElementService netconfNetworkElementService = mock(NetconfNetworkElementService.class); + when(netconfNetworkElementService.getServiceProvider()).thenReturn(serviceProvider); + when(serviceProvider.getVESCollectorService()).thenReturn(vesCollectorService); + mountpointStatePublisher = new MountpointStatePublisher(vesCollectorService); nodeStateListener.start(mountpointStatePublisher); } @@ -92,14 +76,4 @@ public class TestMountpointNodeStateListenerImpl { nodeStateListener.onRemoved(nNodeId); } - @After - public void after() { - File file = new File(fileName); - if (file.exists()) { - System.out.println("File exists, Deleting it"); - file.delete(); - } - - } - } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStateProviderImpl.java index 19d930866..1cebeb697 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStateProviderImpl.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStateProviderImpl.java @@ -24,108 +24,46 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.FileNotFoundException; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; - -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.DeviceManagerServiceProvider; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService; import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStateProviderImpl; +import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestMountpointStateProviderImpl { - - private static Path KARAF_ETC = Paths.get("etc"); - private static MountpointStateProviderImpl mountpointStateProvider; - private static final Logger LOG = LoggerFactory.getLogger(TestMountpointStateProviderImpl.class); + private MountpointStateProviderImpl mountpointStateProvider; - - @BeforeClass - public static void before() throws InterruptedException, IOException { - - System.out.println("Logger: " + LOG.getClass().getName() + " " + LOG.getName()); - // Call System property to get the classpath value - Path etc = KARAF_ETC; - delete(etc); - - System.out.println("Create empty:" + etc.toString()); - Files.createDirectories(etc); - - // Create mocks - - // start using blueprint interface - try { - mountpointStateProvider = new MountpointStateProviderImpl(); - - //mountpointStateProvider.init(); // Can't be tested as this invokes a thread. Mockito doesn't help either - } catch (Exception e) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - fail("Not initialized" + sw.toString()); - } - System.out.println("Initialization status: " + mountpointStateProvider.isInitializationOk()); - System.out.println("Initialization done"); + @Test + public void before() throws InterruptedException, IOException { + NetconfNodeStateService netconfNodeStateService = mock(NetconfNodeStateService.class); + DeviceManagerServiceProvider serviceProvider = mock(DeviceManagerServiceProvider.class); + VESCollectorService vesCollectorService = mock(VESCollectorService.class); + NetconfNetworkElementService netconfNetworkElementService = mock(NetconfNetworkElementService.class); + + when(netconfNetworkElementService.getServiceProvider()).thenReturn(serviceProvider); + when(serviceProvider.getVESCollectorService()).thenReturn(vesCollectorService); + + mountpointStateProvider = new MountpointStateProviderImpl(); + mountpointStateProvider.setNetconfNetworkElementService(netconfNetworkElementService); + mountpointStateProvider.setNetconfNodeStateService(netconfNodeStateService); + mountpointStateProvider.init(); } - @AfterClass - public static void after() throws InterruptedException, IOException { + /* @After + public void after() throws InterruptedException, IOException { - System.out.println("Start shutdown"); - // close using blueprint interface try { - mountpointStateProvider.close(); + mountpointStateProvider.close(); } catch (Exception e) { System.out.println(e); } - delete(KARAF_ETC); - - } - - @Test - public void test1() { - System.out.println("Test1: slave mountpoint"); - System.out.println("Initialization status: " + mountpointStateProvider.isInitializationOk()); - System.out.println("Test2: Done"); - } - - // ********************* Private - - private static void delete(Path etc) throws IOException { - if (Files.exists(etc)) { - System.out.println("Found and remove:" + etc.toString()); - delete(etc.toFile()); - } - } - - private static void delete(File f) throws IOException { - if (f.isDirectory()) { - for (File c : f.listFiles()) { - delete(c); - } - } - if (!f.delete()) { - throw new FileNotFoundException("Failed to delete file: " + f); - } - } - /* @Test - public void testInit() { - - } - - @Test - public void testOnConfigChanged() { - //fail("Not yet implemented"); - }*/ - + }*/ } diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStatePublisher.java index 5fc1c5e15..468e0c1ee 100644 --- a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStatePublisher.java +++ b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStatePublisher.java @@ -24,136 +24,58 @@ package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test; -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import org.json.JSONObject; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration; -import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation; -import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.GeneralConfig; -import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisherMain; -import org.onap.dmaap.mr.client.MRBatchingPublisher; -import org.onap.dmaap.mr.client.response.MRPublisherResponse; -import org.slf4j.Logger; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.DeviceManagerServiceProvider; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorCfgService; +import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService; +import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisher; public class TestMountpointStatePublisher { - private static final String CONFIGURATIONTESTFILE = "test3.properties"; - public Thread publisher; - MountpointStatePublisherMain mountpointStatePublisher; - ConfigurationFileRepresentation configFileRepresentation; - GeneralConfig cfg; + MountpointStatePublisher mountpointStatePublisher; + VESCollectorService vesCollectorService; + VESCollectorCfgService vesCfg; + String vesMsg = "{}"; + JSONObject testJsonData; @Before public void testMountpointStatePublisherData() { - String testJsonData = - "{\"NodeId\":\"69322972e178_50001\",\"NetConfNodeState\":\"Connecting\",\"TimeStamp\":\"2019-11-12T12:45:08.604Z\"}"; - configFileRepresentation = - new ConfigurationFileRepresentation(CONFIGURATIONTESTFILE); - cfg = new GeneralConfig(configFileRepresentation); - JSONObject jsonObj = new JSONObject(testJsonData); - mountpointStatePublisher = new MountpointStatePublisherMain(cfg); - mountpointStatePublisher.getStateObjects().add(jsonObj); + testJsonData = new JSONObject(); + testJsonData.put("NodeId", "69322972e178_50001"); + testJsonData.put("NetConfNodeState", "Connecting"); + testJsonData.put("TimeStamp", java.time.Clock.systemUTC().instant()); + + + DeviceManagerServiceProvider serviceProvider = mock(DeviceManagerServiceProvider.class); + vesCollectorService = mock(VESCollectorService.class); + vesCfg = mock(VESCollectorCfgService.class); + NetconfNetworkElementService netconfNetworkElementService = mock(NetconfNetworkElementService.class); + when(netconfNetworkElementService.getServiceProvider()).thenReturn(serviceProvider); + when(serviceProvider.getVESCollectorService()).thenReturn(vesCollectorService); + when(vesCollectorService.getConfig()).thenReturn(vesCfg); + when(vesCfg.getReportingEntityName()).thenReturn("ONAP SDN-R"); + when(vesCollectorService.publishVESMessage(vesMsg)).thenReturn(true); + + mountpointStatePublisher = new MountpointStatePublisher(vesCollectorService); + mountpointStatePublisher.addToPublish(testJsonData); + //mountpointStatePublisher.getStateObjects().add(testJsonData); } @Test public void testMountpointStatePublisherConfiguration() throws InterruptedException { - ConfigurationFileRepresentation configFileRepresentation = - new ConfigurationFileRepresentation(CONFIGURATIONTESTFILE); - GeneralConfig cfg = new GeneralConfig(configFileRepresentation); - - MountpointStatePublisherMain pub = new MountpointStatePublisherMock(cfg); - pub.createPublisher(null); - pub.publishMessage(pub.createPublisher(null), "Test DMaaP Message"); - - } - - public class MountpointStatePublisherMock extends MountpointStatePublisherMain { - - public MountpointStatePublisherMock(Configuration config) { - super(config); - } - - @Override - public MRBatchingPublisher createPublisher(Properties publisherProperties) { - - return new MRBatchingPublisher() { - - @Override - public int send(String msg) throws IOException { - System.out.println("Message to send - " + msg); - return 0; - } - - @Override - public int send(String partition, String msg) throws IOException { - return 0; - } - - @Override - public int send(message msg) throws IOException { - return 0; - } - - @Override - public int send(Collection<message> msgs) throws IOException { - return 0; - } - - @Override - public void close() { - - } - - @Override - public void logTo(Logger log) { - - } - - @Override - public void setApiCredentials(String apiKey, String apiSecret) { - - } - - @Override - public void clearApiCredentials() { - - } - - @Override - public int getPendingMessageCount() { - return 0; - } - - @Override - public List<message> close(long timeout, TimeUnit timeoutUnits) - throws IOException, InterruptedException { - return null; - } - - @Override - public MRPublisherResponse sendBatchWithResponse() { - return null; - } - - }; - } + Thread t = new Thread(mountpointStatePublisher); + t.start(); + Thread.sleep(7000); } @After - public void after() { - File file = new File(CONFIGURATIONTESTFILE); - if (file.exists()) { - System.out.println("File exists, Deleting it"); - file.delete(); - } - + public void close() { + mountpointStatePublisher.stop(); } - } |