diff options
Diffstat (limited to 'plugins/reception-plugins/src/main/java/org')
4 files changed, 63 insertions, 13 deletions
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java index fd4f69cc..2072a0eb 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java @@ -36,15 +36,18 @@ public class FileClientHandler implements Runnable { private FileSystemReceptionHandler fileReceptionHandler; private String watchPath; + private int maxThread; /** * Constructs an instance of {@link FileClientHandler} class. * * @param fileReceptionHandler the fileReceptionHandler */ - public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath) { + public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath, + final int maxThread) { this.fileReceptionHandler = fileReceptionHandler; this.watchPath = watchPath; + this.maxThread = maxThread; } /** @@ -53,7 +56,7 @@ public class FileClientHandler implements Runnable { @Override public void run() { try { - fileReceptionHandler.initFileWatcher(watchPath); + fileReceptionHandler.initFileWatcher(watchPath, maxThread); } catch (final IOException ex) { LOGGER.error("Failed initializing file watcher thread", ex); } diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java index a3f6fab0..0ddd2a92 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java @@ -31,6 +31,8 @@ import java.nio.file.Paths; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.zip.ZipFile; @@ -59,7 +61,9 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { try { final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters = ParameterService.get(parameterGroupName); - final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath()); + final FileClientHandler fileClientHandler = new FileClientHandler(this, + handlerParameters.getWatchPath(), + handlerParameters.getMaxThread()); final Thread fileWatcherThread = new Thread(fileClientHandler); fileWatcherThread.start(); } catch (final Exception ex) { @@ -89,13 +93,13 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { * * @param watchPath Path to watch */ - public void initFileWatcher(final String watchPath) throws IOException { + public void initFileWatcher(final String watchPath, final int maxThread) throws IOException { try (final WatchService watcher = FileSystems.getDefault().newWatchService()) { final Path dir = Paths.get(watchPath); dir.register(watcher, ENTRY_CREATE); LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName()); - startWatchService(watcher, dir); - } catch (final InterruptedException ex) { + startWatchService(watcher, dir, maxThread); + } catch (final Exception ex) { LOGGER.error("FileWatcher initialization failed", ex); Thread.currentThread().interrupt(); } @@ -106,11 +110,16 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { * * @param watcher the watcher * @param dir the watch directory + * @param maxThread the max thread number * @throws InterruptedException if it occurs */ @SuppressWarnings("unchecked") - protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException { + protected void startWatchService(final WatchService watcher, + final Path dir, + int maxThread) throws InterruptedException, NullPointerException, IllegalArgumentException { WatchKey key; + ExecutorService pool = Executors.newFixedThreadPool(maxThread); + running = true; while (running) { key = watcher.take(); @@ -118,12 +127,20 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { for (final WatchEvent<?> event : key.pollEvents()) { final WatchEvent<Path> ev = (WatchEvent<Path>) event; final Path fileName = ev.context(); - LOGGER.debug("new CSAR found: {}", fileName); - DistributionStatisticsManager.updateTotalDistributionCount(); - final String fullFilePath = dir.toString() + File.separator + fileName.toString(); - waitForFileToBeReady(fullFilePath); - createPolicyInputAndCallHandler(fullFilePath); - LOGGER.debug("CSAR complete: {}", fileName); + pool.execute(new Runnable() { + public void run() { + LOGGER.debug("new CSAR found: {}", fileName); + DistributionStatisticsManager.updateTotalDistributionCount(); + final String fullFilePath = dir.toString() + File.separator + fileName.toString(); + try { + waitForFileToBeReady(fullFilePath); + createPolicyInputAndCallHandler(fullFilePath); + LOGGER.debug("CSAR complete: {}", fileName); + } catch (InterruptedException e) { + LOGGER.error("waitForFileToBeReady interrupted", e); + } + } + }); } final boolean valid = key.reset(); if (!valid) { @@ -131,6 +148,7 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { break; } } + pool.shutdown(); } /** diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java index 693ff0ec..415e0d94 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java @@ -27,6 +27,7 @@ package org.onap.policy.distribution.reception.handling.file; public class FileSystemReceptionHandlerConfigurationParameterBuilder { private String watchPath; + private int maxThread = 1; /** * Set watchPath to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance. @@ -38,6 +39,15 @@ public class FileSystemReceptionHandlerConfigurationParameterBuilder { return this; } + /** + * Set maxThread to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance. + * + * @param maxThread the max thread number in the thread pool + */ + public FileSystemReceptionHandlerConfigurationParameterBuilder setMaxThread(final int maxThread) { + this.maxThread = maxThread; + return this; + } /** * Returns the watchPath of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance. @@ -47,6 +57,15 @@ public class FileSystemReceptionHandlerConfigurationParameterBuilder { public String getWatchPath() { return watchPath; } + + /** + * Returns the maxThread of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance. + * + * @return the maxThread + */ + public int getMaxThread() { + return maxThread; + } } diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java index dd50dc78..f9045816 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java @@ -24,6 +24,7 @@ import java.io.File; import org.onap.policy.common.parameters.GroupValidationResult; import org.onap.policy.common.parameters.ValidationStatus; +import org.onap.policy.common.utils.validation.ParameterValidationUtils; import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigurationParameterGroup; /** @@ -33,6 +34,7 @@ import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigu public class FileSystemReceptionHandlerConfigurationParameterGroup extends ReceptionHandlerConfigurationParameterGroup { private String watchPath; + private int maxThread; /** * The constructor for instantiating {@link FileSystemReceptionHandlerConfigurationParameterGroup} class. @@ -42,12 +44,17 @@ public class FileSystemReceptionHandlerConfigurationParameterGroup extends Recep public FileSystemReceptionHandlerConfigurationParameterGroup( final FileSystemReceptionHandlerConfigurationParameterBuilder builder) { watchPath = builder.getWatchPath(); + maxThread = builder.getMaxThread(); } public String getWatchPath() { return watchPath; } + public int getMaxThread() { + return maxThread; + } + /** * {@inheritDoc}. */ @@ -55,6 +62,9 @@ public class FileSystemReceptionHandlerConfigurationParameterGroup extends Recep public GroupValidationResult validate() { final GroupValidationResult validationResult = new GroupValidationResult(this); validatePathElement(validationResult, watchPath, "watchPath"); + if (!ParameterValidationUtils.validateIntParameter(maxThread)) { + validationResult.setResult("maxThread", ValidationStatus.INVALID, "must be a positive integer"); + } return validationResult; } |