diff options
Diffstat (limited to 'plugins/reception-plugins')
7 files changed, 72 insertions, 18 deletions
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java index fd4f69cc..2072a0eb 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java @@ -36,15 +36,18 @@ public class FileClientHandler implements Runnable { private FileSystemReceptionHandler fileReceptionHandler; private String watchPath; + private int maxThread; /** * Constructs an instance of {@link FileClientHandler} class. * * @param fileReceptionHandler the fileReceptionHandler */ - public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath) { + public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath, + final int maxThread) { this.fileReceptionHandler = fileReceptionHandler; this.watchPath = watchPath; + this.maxThread = maxThread; } /** @@ -53,7 +56,7 @@ public class FileClientHandler implements Runnable { @Override public void run() { try { - fileReceptionHandler.initFileWatcher(watchPath); + fileReceptionHandler.initFileWatcher(watchPath, maxThread); } catch (final IOException ex) { LOGGER.error("Failed initializing file watcher thread", ex); } diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java index a3f6fab0..0ddd2a92 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java @@ -31,6 +31,8 @@ import java.nio.file.Paths; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.zip.ZipFile; @@ -59,7 +61,9 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { try { final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters = ParameterService.get(parameterGroupName); - final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath()); + final FileClientHandler fileClientHandler = new FileClientHandler(this, + handlerParameters.getWatchPath(), + handlerParameters.getMaxThread()); final Thread fileWatcherThread = new Thread(fileClientHandler); fileWatcherThread.start(); } catch (final Exception ex) { @@ -89,13 +93,13 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { * * @param watchPath Path to watch */ - public void initFileWatcher(final String watchPath) throws IOException { + public void initFileWatcher(final String watchPath, final int maxThread) throws IOException { try (final WatchService watcher = FileSystems.getDefault().newWatchService()) { final Path dir = Paths.get(watchPath); dir.register(watcher, ENTRY_CREATE); LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName()); - startWatchService(watcher, dir); - } catch (final InterruptedException ex) { + startWatchService(watcher, dir, maxThread); + } catch (final Exception ex) { LOGGER.error("FileWatcher initialization failed", ex); Thread.currentThread().interrupt(); } @@ -106,11 +110,16 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { * * @param watcher the watcher * @param dir the watch directory + * @param maxThread the max thread number * @throws InterruptedException if it occurs */ @SuppressWarnings("unchecked") - protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException { + protected void startWatchService(final WatchService watcher, + final Path dir, + int maxThread) throws InterruptedException, NullPointerException, IllegalArgumentException { WatchKey key; + ExecutorService pool = Executors.newFixedThreadPool(maxThread); + running = true; while (running) { key = watcher.take(); @@ -118,12 +127,20 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { for (final WatchEvent<?> event : key.pollEvents()) { final WatchEvent<Path> ev = (WatchEvent<Path>) event; final Path fileName = ev.context(); - LOGGER.debug("new CSAR found: {}", fileName); - DistributionStatisticsManager.updateTotalDistributionCount(); - final String fullFilePath = dir.toString() + File.separator + fileName.toString(); - waitForFileToBeReady(fullFilePath); - createPolicyInputAndCallHandler(fullFilePath); - LOGGER.debug("CSAR complete: {}", fileName); + pool.execute(new Runnable() { + public void run() { + LOGGER.debug("new CSAR found: {}", fileName); + DistributionStatisticsManager.updateTotalDistributionCount(); + final String fullFilePath = dir.toString() + File.separator + fileName.toString(); + try { + waitForFileToBeReady(fullFilePath); + createPolicyInputAndCallHandler(fullFilePath); + LOGGER.debug("CSAR complete: {}", fileName); + } catch (InterruptedException e) { + LOGGER.error("waitForFileToBeReady interrupted", e); + } + } + }); } final boolean valid = key.reset(); if (!valid) { @@ -131,6 +148,7 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { break; } } + pool.shutdown(); } /** diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java index 693ff0ec..415e0d94 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java @@ -27,6 +27,7 @@ package org.onap.policy.distribution.reception.handling.file; public class FileSystemReceptionHandlerConfigurationParameterBuilder { private String watchPath; + private int maxThread = 1; /** * Set watchPath to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance. @@ -38,6 +39,15 @@ public class FileSystemReceptionHandlerConfigurationParameterBuilder { return this; } + /** + * Set maxThread to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance. + * + * @param maxThread the max thread number in the thread pool + */ + public FileSystemReceptionHandlerConfigurationParameterBuilder setMaxThread(final int maxThread) { + this.maxThread = maxThread; + return this; + } /** * Returns the watchPath of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance. @@ -47,6 +57,15 @@ public class FileSystemReceptionHandlerConfigurationParameterBuilder { public String getWatchPath() { return watchPath; } + + /** + * Returns the maxThread of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance. + * + * @return the maxThread + */ + public int getMaxThread() { + return maxThread; + } } diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java index dd50dc78..f9045816 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java @@ -24,6 +24,7 @@ import java.io.File; import org.onap.policy.common.parameters.GroupValidationResult; import org.onap.policy.common.parameters.ValidationStatus; +import org.onap.policy.common.utils.validation.ParameterValidationUtils; import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigurationParameterGroup; /** @@ -33,6 +34,7 @@ import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigu public class FileSystemReceptionHandlerConfigurationParameterGroup extends ReceptionHandlerConfigurationParameterGroup { private String watchPath; + private int maxThread; /** * The constructor for instantiating {@link FileSystemReceptionHandlerConfigurationParameterGroup} class. @@ -42,12 +44,17 @@ public class FileSystemReceptionHandlerConfigurationParameterGroup extends Recep public FileSystemReceptionHandlerConfigurationParameterGroup( final FileSystemReceptionHandlerConfigurationParameterBuilder builder) { watchPath = builder.getWatchPath(); + maxThread = builder.getMaxThread(); } public String getWatchPath() { return watchPath; } + public int getMaxThread() { + return maxThread; + } + /** * {@inheritDoc}. */ @@ -55,6 +62,9 @@ public class FileSystemReceptionHandlerConfigurationParameterGroup extends Recep public GroupValidationResult validate() { final GroupValidationResult validationResult = new GroupValidationResult(this); validatePathElement(validationResult, watchPath, "watchPath"); + if (!ParameterValidationUtils.validateIntParameter(maxThread)) { + validationResult.setResult("maxThread", ValidationStatus.INVALID, "must be a positive integer"); + } return validationResult; } diff --git a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java index fdb01007..556b1d6d 100644 --- a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java +++ b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java @@ -93,7 +93,8 @@ public class TestFileSystemReceptionHandler { @Test public final void testInit() throws IOException, InterruptedException { final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler); - Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class)); + Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class), + Mockito.anyInt()); try { sypHandler.initializeReception(pssdConfigParameters.getName()); } catch (final Exception exp) { @@ -106,7 +107,8 @@ public class TestFileSystemReceptionHandler { public final void testDestroy() throws IOException { try { final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler); - Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class)); + Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class), + Mockito.anyInt()); sypHandler.initializeReception(pssdConfigParameters.getName()); sypHandler.destroy(); } catch (final Exception exp) { @@ -141,7 +143,7 @@ public class TestFileSystemReceptionHandler { final Thread th = new Thread(() -> { try { - sypHandler.initFileWatcher(watchPath); + sypHandler.initFileWatcher(watchPath, 2); } catch (final IOException ex) { LOGGER.error("testMain failed", ex); } diff --git a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java index 92d9443e..1d32b191 100644 --- a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java +++ b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java @@ -53,7 +53,7 @@ public class TestFileSystemReceptionHandlerConfigurationParameterGroup { validPath = tempFolder.getRoot().getAbsolutePath(); final FileSystemReceptionHandlerConfigurationParameterBuilder builder = - new FileSystemReceptionHandlerConfigurationParameterBuilder().setWatchPath(validPath); + new FileSystemReceptionHandlerConfigurationParameterBuilder().setWatchPath(validPath).setMaxThread(2); configParameters = new FileSystemReceptionHandlerConfigurationParameterGroup(builder); } catch (final Exception e) { fail("test should not thrown an exception here: " + e.getMessage()); @@ -61,6 +61,7 @@ public class TestFileSystemReceptionHandlerConfigurationParameterGroup { final GroupValidationResult validationResult = configParameters.validate(); assertTrue(validationResult.isValid()); assertEquals(validPath, configParameters.getWatchPath()); + assertEquals(2, configParameters.getMaxThread()); } @Test diff --git a/plugins/reception-plugins/src/test/resources/handling-filesystem.json b/plugins/reception-plugins/src/test/resources/handling-filesystem.json index 6a402a7a..5274d8e2 100644 --- a/plugins/reception-plugins/src/test/resources/handling-filesystem.json +++ b/plugins/reception-plugins/src/test/resources/handling-filesystem.json @@ -1,5 +1,6 @@ { "name": "parameterConfig1", - "watchPath": "/tmp" + "watchPath": "/tmp", + "maxThread": 2 } |