aboutsummaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorLianhao Lu <lianhao.lu@intel.com>2019-04-10 14:40:11 +0800
committerLianhao Lu <lianhao.lu@intel.com>2019-04-10 14:40:11 +0800
commit99def6ddf75d44a8c8c02144cb49b346a82bdaf6 (patch)
tree98695a7d4589d0bee5513b62fbdbc4082fe29af4 /plugins
parent42370a06ea7681b6e422b1d5739ec4d6c1287103 (diff)
Add multi-thread support in FileSystemReceptionHander
By adding multi-thread support in FileSystemReceptionHander, we can have a more thorough s3p test to test performance & stablity in multithread situations. Change-Id: Id263435531e26dcbadfbda6f82b26ac54a72ba1a Issue-ID: POLICY-1274 Signed-off-by: Lianhao Lu <lianhao.lu@intel.com>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java7
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java40
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java19
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java10
-rw-r--r--plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java8
-rw-r--r--plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java3
-rw-r--r--plugins/reception-plugins/src/test/resources/handling-filesystem.json3
7 files changed, 72 insertions, 18 deletions
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
index fd4f69cc..2072a0eb 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
@@ -36,15 +36,18 @@ public class FileClientHandler implements Runnable {
private FileSystemReceptionHandler fileReceptionHandler;
private String watchPath;
+ private int maxThread;
/**
* Constructs an instance of {@link FileClientHandler} class.
*
* @param fileReceptionHandler the fileReceptionHandler
*/
- public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath) {
+ public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath,
+ final int maxThread) {
this.fileReceptionHandler = fileReceptionHandler;
this.watchPath = watchPath;
+ this.maxThread = maxThread;
}
/**
@@ -53,7 +56,7 @@ public class FileClientHandler implements Runnable {
@Override
public void run() {
try {
- fileReceptionHandler.initFileWatcher(watchPath);
+ fileReceptionHandler.initFileWatcher(watchPath, maxThread);
} catch (final IOException ex) {
LOGGER.error("Failed initializing file watcher thread", ex);
}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
index a3f6fab0..0ddd2a92 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
@@ -31,6 +31,8 @@ import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipFile;
@@ -59,7 +61,9 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
try {
final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
ParameterService.get(parameterGroupName);
- final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath());
+ final FileClientHandler fileClientHandler = new FileClientHandler(this,
+ handlerParameters.getWatchPath(),
+ handlerParameters.getMaxThread());
final Thread fileWatcherThread = new Thread(fileClientHandler);
fileWatcherThread.start();
} catch (final Exception ex) {
@@ -89,13 +93,13 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
*
* @param watchPath Path to watch
*/
- public void initFileWatcher(final String watchPath) throws IOException {
+ public void initFileWatcher(final String watchPath, final int maxThread) throws IOException {
try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
final Path dir = Paths.get(watchPath);
dir.register(watcher, ENTRY_CREATE);
LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
- startWatchService(watcher, dir);
- } catch (final InterruptedException ex) {
+ startWatchService(watcher, dir, maxThread);
+ } catch (final Exception ex) {
LOGGER.error("FileWatcher initialization failed", ex);
Thread.currentThread().interrupt();
}
@@ -106,11 +110,16 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
*
* @param watcher the watcher
* @param dir the watch directory
+ * @param maxThread the max thread number
* @throws InterruptedException if it occurs
*/
@SuppressWarnings("unchecked")
- protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException {
+ protected void startWatchService(final WatchService watcher,
+ final Path dir,
+ int maxThread) throws InterruptedException, NullPointerException, IllegalArgumentException {
WatchKey key;
+ ExecutorService pool = Executors.newFixedThreadPool(maxThread);
+
running = true;
while (running) {
key = watcher.take();
@@ -118,12 +127,20 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
for (final WatchEvent<?> event : key.pollEvents()) {
final WatchEvent<Path> ev = (WatchEvent<Path>) event;
final Path fileName = ev.context();
- LOGGER.debug("new CSAR found: {}", fileName);
- DistributionStatisticsManager.updateTotalDistributionCount();
- final String fullFilePath = dir.toString() + File.separator + fileName.toString();
- waitForFileToBeReady(fullFilePath);
- createPolicyInputAndCallHandler(fullFilePath);
- LOGGER.debug("CSAR complete: {}", fileName);
+ pool.execute(new Runnable() {
+ public void run() {
+ LOGGER.debug("new CSAR found: {}", fileName);
+ DistributionStatisticsManager.updateTotalDistributionCount();
+ final String fullFilePath = dir.toString() + File.separator + fileName.toString();
+ try {
+ waitForFileToBeReady(fullFilePath);
+ createPolicyInputAndCallHandler(fullFilePath);
+ LOGGER.debug("CSAR complete: {}", fileName);
+ } catch (InterruptedException e) {
+ LOGGER.error("waitForFileToBeReady interrupted", e);
+ }
+ }
+ });
}
final boolean valid = key.reset();
if (!valid) {
@@ -131,6 +148,7 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
break;
}
}
+ pool.shutdown();
}
/**
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
index 693ff0ec..415e0d94 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
@@ -27,6 +27,7 @@ package org.onap.policy.distribution.reception.handling.file;
public class FileSystemReceptionHandlerConfigurationParameterBuilder {
private String watchPath;
+ private int maxThread = 1;
/**
* Set watchPath to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
@@ -38,6 +39,15 @@ public class FileSystemReceptionHandlerConfigurationParameterBuilder {
return this;
}
+ /**
+ * Set maxThread to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
+ *
+ * @param maxThread the max thread number in the thread pool
+ */
+ public FileSystemReceptionHandlerConfigurationParameterBuilder setMaxThread(final int maxThread) {
+ this.maxThread = maxThread;
+ return this;
+ }
/**
* Returns the watchPath of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
@@ -47,6 +57,15 @@ public class FileSystemReceptionHandlerConfigurationParameterBuilder {
public String getWatchPath() {
return watchPath;
}
+
+ /**
+ * Returns the maxThread of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
+ *
+ * @return the maxThread
+ */
+ public int getMaxThread() {
+ return maxThread;
+ }
}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
index dd50dc78..f9045816 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
@@ -24,6 +24,7 @@ import java.io.File;
import org.onap.policy.common.parameters.GroupValidationResult;
import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.common.utils.validation.ParameterValidationUtils;
import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigurationParameterGroup;
/**
@@ -33,6 +34,7 @@ import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigu
public class FileSystemReceptionHandlerConfigurationParameterGroup extends ReceptionHandlerConfigurationParameterGroup {
private String watchPath;
+ private int maxThread;
/**
* The constructor for instantiating {@link FileSystemReceptionHandlerConfigurationParameterGroup} class.
@@ -42,12 +44,17 @@ public class FileSystemReceptionHandlerConfigurationParameterGroup extends Recep
public FileSystemReceptionHandlerConfigurationParameterGroup(
final FileSystemReceptionHandlerConfigurationParameterBuilder builder) {
watchPath = builder.getWatchPath();
+ maxThread = builder.getMaxThread();
}
public String getWatchPath() {
return watchPath;
}
+ public int getMaxThread() {
+ return maxThread;
+ }
+
/**
* {@inheritDoc}.
*/
@@ -55,6 +62,9 @@ public class FileSystemReceptionHandlerConfigurationParameterGroup extends Recep
public GroupValidationResult validate() {
final GroupValidationResult validationResult = new GroupValidationResult(this);
validatePathElement(validationResult, watchPath, "watchPath");
+ if (!ParameterValidationUtils.validateIntParameter(maxThread)) {
+ validationResult.setResult("maxThread", ValidationStatus.INVALID, "must be a positive integer");
+ }
return validationResult;
}
diff --git a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java
index fdb01007..556b1d6d 100644
--- a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java
+++ b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java
@@ -93,7 +93,8 @@ public class TestFileSystemReceptionHandler {
@Test
public final void testInit() throws IOException, InterruptedException {
final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
- Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class));
+ Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class),
+ Mockito.anyInt());
try {
sypHandler.initializeReception(pssdConfigParameters.getName());
} catch (final Exception exp) {
@@ -106,7 +107,8 @@ public class TestFileSystemReceptionHandler {
public final void testDestroy() throws IOException {
try {
final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
- Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class));
+ Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class),
+ Mockito.anyInt());
sypHandler.initializeReception(pssdConfigParameters.getName());
sypHandler.destroy();
} catch (final Exception exp) {
@@ -141,7 +143,7 @@ public class TestFileSystemReceptionHandler {
final Thread th = new Thread(() -> {
try {
- sypHandler.initFileWatcher(watchPath);
+ sypHandler.initFileWatcher(watchPath, 2);
} catch (final IOException ex) {
LOGGER.error("testMain failed", ex);
}
diff --git a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java
index 92d9443e..1d32b191 100644
--- a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java
+++ b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java
@@ -53,7 +53,7 @@ public class TestFileSystemReceptionHandlerConfigurationParameterGroup {
validPath = tempFolder.getRoot().getAbsolutePath();
final FileSystemReceptionHandlerConfigurationParameterBuilder builder =
- new FileSystemReceptionHandlerConfigurationParameterBuilder().setWatchPath(validPath);
+ new FileSystemReceptionHandlerConfigurationParameterBuilder().setWatchPath(validPath).setMaxThread(2);
configParameters = new FileSystemReceptionHandlerConfigurationParameterGroup(builder);
} catch (final Exception e) {
fail("test should not thrown an exception here: " + e.getMessage());
@@ -61,6 +61,7 @@ public class TestFileSystemReceptionHandlerConfigurationParameterGroup {
final GroupValidationResult validationResult = configParameters.validate();
assertTrue(validationResult.isValid());
assertEquals(validPath, configParameters.getWatchPath());
+ assertEquals(2, configParameters.getMaxThread());
}
@Test
diff --git a/plugins/reception-plugins/src/test/resources/handling-filesystem.json b/plugins/reception-plugins/src/test/resources/handling-filesystem.json
index 6a402a7a..5274d8e2 100644
--- a/plugins/reception-plugins/src/test/resources/handling-filesystem.json
+++ b/plugins/reception-plugins/src/test/resources/handling-filesystem.json
@@ -1,5 +1,6 @@
{
"name": "parameterConfig1",
- "watchPath": "/tmp"
+ "watchPath": "/tmp",
+ "maxThread": 2
}