aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/forwarding-plugins/src/main/java/org/onap/policy/distribution/forwarding/file/FilePolicyForwarder.java6
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java39
2 files changed, 24 insertions, 21 deletions
diff --git a/plugins/forwarding-plugins/src/main/java/org/onap/policy/distribution/forwarding/file/FilePolicyForwarder.java b/plugins/forwarding-plugins/src/main/java/org/onap/policy/distribution/forwarding/file/FilePolicyForwarder.java
index d02d9bba..51700005 100644
--- a/plugins/forwarding-plugins/src/main/java/org/onap/policy/distribution/forwarding/file/FilePolicyForwarder.java
+++ b/plugins/forwarding-plugins/src/main/java/org/onap/policy/distribution/forwarding/file/FilePolicyForwarder.java
@@ -55,7 +55,7 @@ public class FilePolicyForwarder implements PolicyForwarder {
fileForwarderParameters = ParameterService.get(parameterGroupName);
try {
Path path = Paths.get(fileForwarderParameters.getPath());
- if (!Files.exists(path)) {
+ if (!path.toFile().exists()) {
Files.createDirectories(path);
}
} catch (final InvalidPathException | IOException e) {
@@ -72,8 +72,8 @@ public class FilePolicyForwarder implements PolicyForwarder {
if (policy instanceof OptimizationPolicy) {
forwardPolicy((OptimizationPolicy) policy);
} else {
- final String message = new String("Cannot forward policy " + policy
- + ". Unsupported policy type " + policy.getClass().getSimpleName());
+ final String message = "Cannot forward policy " + policy
+ + ". Unsupported policy type " + policy.getClass().getSimpleName();
LOGGER.error(message);
throw new PolicyForwardingException(message);
}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
index 72341ee5..df5302c0 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
@@ -123,24 +123,7 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
running = true;
while (running) {
key = watcher.take();
-
- for (final WatchEvent<?> event : key.pollEvents()) {
- final WatchEvent<Path> ev = (WatchEvent<Path>) event;
- final Path fileName = ev.context();
- pool.execute(() -> {
- LOGGER.debug("new CSAR found: {}", fileName);
- DistributionStatisticsManager.updateTotalDistributionCount();
- final String fullFilePath = dir.toString() + File.separator + fileName.toString();
- try {
- waitForFileToBeReady(fullFilePath);
- createPolicyInputAndCallHandler(fullFilePath);
- LOGGER.debug("CSAR complete: {}", fileName);
- } catch (InterruptedException e) {
- LOGGER.error("waitForFileToBeReady interrupted", e);
- Thread.currentThread().interrupt();
- }
- });
- }
+ processFileEvents(dir, key, pool);
final boolean valid = key.reset();
if (!valid) {
LOGGER.error("Watch key no longer valid!");
@@ -152,6 +135,26 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
}
}
+ private void processFileEvents(Path dir, WatchKey key, ExecutorService pool) {
+ for (final WatchEvent<?> event : key.pollEvents()) {
+ final WatchEvent<Path> ev = (WatchEvent<Path>) event;
+ final Path fileName = ev.context();
+ pool.execute(() -> {
+ LOGGER.debug("new CSAR found: {}", fileName);
+ DistributionStatisticsManager.updateTotalDistributionCount();
+ final String fullFilePath = dir.toString() + File.separator + fileName.toString();
+ try {
+ waitForFileToBeReady(fullFilePath);
+ createPolicyInputAndCallHandler(fullFilePath);
+ LOGGER.debug("CSAR complete: {}", fileName);
+ } catch (InterruptedException e) {
+ LOGGER.error("waitForFileToBeReady interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ });
+ }
+ }
+
/**
* Method to create policy input & call policy handlers.
*