aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/reception-plugins/src/main/java/org
diff options
context:
space:
mode:
authorramverma <ram.krishna.verma@est.tech>2019-01-22 12:39:39 +0000
committerramverma <ram.krishna.verma@est.tech>2019-01-22 12:39:39 +0000
commit1936220d00ee644774e58ab33de39cca4a006d2a (patch)
tree59f9cd0d337a88424813e9eacb37a94720f15acd /plugins/reception-plugins/src/main/java/org
parenteac48bb3c587999376af202ab421275a27279523 (diff)
Fix issues in policy-distribution
1) Creating a new thread for watching directory path for new file. 2) Updating distribution statistics from FileSystemReceptionHandler. Change-Id: Ic539f2cad015f0756407fe910f309a2ea661a764 Issue-ID: POLICY-1437 Signed-off-by: ramverma <ram.krishna.verma@est.tech>
Diffstat (limited to 'plugins/reception-plugins/src/main/java/org')
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java61
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java (renamed from plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java)74
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java (renamed from plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java)2
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java (renamed from plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterGroup.java)2
4 files changed, 123 insertions, 16 deletions
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
new file mode 100644
index 00000000..f8e57747
--- /dev/null
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
@@ -0,0 +1,61 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distribution.reception.handling.file;
+
+import java.io.IOException;
+
+import org.onap.policy.common.logging.flexlogger.FlexLogger;
+import org.onap.policy.common.logging.flexlogger.Logger;
+
+/**
+ * This class implements Runnable interface for creating new thread which will be used as file watcher.
+ *
+ * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
+ */
+public class FileClientHandler implements Runnable {
+
+ private static final Logger LOGGER = FlexLogger.getLogger(FileClientHandler.class);
+
+ private FileSystemReceptionHandler fileReceptionHandler;
+ private String watchPath;
+
+ /**
+ * Constructs an instance of {@link FileClientHandler} class.
+ *
+ * @param fileReceptionHandler the fileReceptionHandler
+ */
+ public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath) {
+ this.fileReceptionHandler = fileReceptionHandler;
+ this.watchPath = watchPath;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void run() {
+ try {
+ fileReceptionHandler.initFileWatcher(watchPath);
+ } catch (final IOException ex) {
+ LOGGER.error(ex);
+ }
+ }
+}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
index 941cdd61..3cb167f3 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018 Intel Corp. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +19,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.distribution.reception.handling.sdc;
+package org.onap.policy.distribution.reception.handling.file;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
@@ -30,65 +31,85 @@ import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipFile;
import org.onap.policy.common.logging.flexlogger.FlexLogger;
import org.onap.policy.common.logging.flexlogger.Logger;
-
import org.onap.policy.common.parameters.ParameterService;
import org.onap.policy.distribution.model.Csar;
import org.onap.policy.distribution.reception.decoding.PolicyDecodingException;
import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
+import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager;
/**
* Handles reception of inputs from File System which can be used to decode policies.
*/
public class FileSystemReceptionHandler extends AbstractReceptionHandler {
- private boolean running = false;
+
private static final Logger LOGGER = FlexLogger.getLogger(FileSystemReceptionHandler.class);
+ private boolean running = false;
+ /**
+ * {@inheritDoc}.
+ */
@Override
protected void initializeReception(final String parameterGroupName) {
LOGGER.debug("FileSystemReceptionHandler init...");
try {
final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
- ParameterService.get(parameterGroupName);
- main(handlerParameters.getWatchPath());
+ ParameterService.get(parameterGroupName);
+ final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath());
+ final Thread fileWatcherThread = new Thread(fileClientHandler);
+ fileWatcherThread.start();
} catch (final Exception ex) {
LOGGER.error(ex);
}
- running = false;
- LOGGER.debug("FileSystemReceptionHandler main loop exited...");
}
+ /**
+ * {@inheritDoc}.
+ */
@Override
public void destroy() {
- // Tear down subscription etc
running = false;
}
+ /**
+ * Method to check the running status of file watcher thread.
+ *
+ * @return the running status
+ */
public boolean isRunning() {
return running;
}
/**
- * Main entry point.
- *
+ * Initialize the file watcher thread.
+ *
* @param watchPath Path to watch
*/
- public void main(String watchPath) throws IOException {
+ public void initFileWatcher(final String watchPath) throws IOException {
try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
final Path dir = Paths.get(watchPath);
dir.register(watcher, ENTRY_CREATE);
LOGGER.debug("Watch Service registered for dir: " + dir.getFileName());
- startMainLoop(watcher, dir);
+ startWatchService(watcher, dir);
} catch (final InterruptedException ex) {
LOGGER.debug(ex);
Thread.currentThread().interrupt();
}
}
+ /**
+ * Method to keep watching the given path for any new file created.
+ *
+ * @param watcher the watcher
+ * @param dir the watch directory
+ * @throws InterruptedException if it occurs
+ */
@SuppressWarnings("unchecked")
- protected void startMainLoop(WatchService watcher, Path dir) throws InterruptedException {
+ protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException {
WatchKey key;
running = true;
while (running) {
@@ -98,7 +119,10 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
final WatchEvent<Path> ev = (WatchEvent<Path>) event;
final Path fileName = ev.context();
LOGGER.debug("new CSAR found: " + fileName);
- createPolicyInputAndCallHandler(dir.toString() + File.separator + fileName.toString());
+ DistributionStatisticsManager.updateTotalDistributionCount();
+ final String fullFilePath = dir.toString() + File.separator + fileName.toString();
+ waitForFileToBeReady(fullFilePath);
+ createPolicyInputAndCallHandler(fullFilePath);
LOGGER.debug("CSAR complete: " + fileName);
}
final boolean valid = key.reset();
@@ -109,12 +133,34 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
}
}
+ /**
+ * Method to create policy input & call policy handlers.
+ *
+ * @param fileName the filename
+ */
protected void createPolicyInputAndCallHandler(final String fileName) {
try {
final Csar csarObject = new Csar(fileName);
+ DistributionStatisticsManager.updateTotalDownloadCount();
inputReceived(csarObject);
+ DistributionStatisticsManager.updateDownloadSuccessCount();
+ DistributionStatisticsManager.updateDistributionSuccessCount();
} catch (final PolicyDecodingException ex) {
+ DistributionStatisticsManager.updateDownloadFailureCount();
+ DistributionStatisticsManager.updateDistributionFailureCount();
LOGGER.error(ex);
}
}
+
+ private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException {
+ boolean flag = true;
+ while (flag) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ try (ZipFile zipFile = new ZipFile(fullFilePath)) {
+ flag = false;
+ } catch (final IOException exp) {
+ LOGGER.error("file is not ready for reading, wait for sometime and try again", exp);
+ }
+ }
+ }
}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
index 37a16980..693ff0ec 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.distribution.reception.handling.sdc;
+package org.onap.policy.distribution.reception.handling.file;
/**
* This class builds an instance of {@link FileSystemReceptionHandlerConfigurationParameterGroup} class.
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
index 457cd5ea..dd50dc78 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterGroup.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.distribution.reception.handling.sdc;
+package org.onap.policy.distribution.reception.handling.file;
import java.io.File;