aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception
diff options
context:
space:
mode:
authorLianhao Lu <lianhao.lu@intel.com>2019-04-10 14:40:11 +0800
committerLianhao Lu <lianhao.lu@intel.com>2019-04-10 14:40:11 +0800
commit99def6ddf75d44a8c8c02144cb49b346a82bdaf6 (patch)
tree98695a7d4589d0bee5513b62fbdbc4082fe29af4 /plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception
parent42370a06ea7681b6e422b1d5739ec4d6c1287103 (diff)
Add multi-thread support in FileSystemReceptionHander
By adding multi-thread support in FileSystemReceptionHander, we can have a more thorough s3p test to test performance & stablity in multithread situations. Change-Id: Id263435531e26dcbadfbda6f82b26ac54a72ba1a Issue-ID: POLICY-1274 Signed-off-by: Lianhao Lu <lianhao.lu@intel.com>
Diffstat (limited to 'plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception')
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java7
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java40
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java19
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java10
4 files changed, 63 insertions, 13 deletions
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
index fd4f69cc..2072a0eb 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
@@ -36,15 +36,18 @@ public class FileClientHandler implements Runnable {
private FileSystemReceptionHandler fileReceptionHandler;
private String watchPath;
+ private int maxThread;
/**
* Constructs an instance of {@link FileClientHandler} class.
*
* @param fileReceptionHandler the fileReceptionHandler
*/
- public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath) {
+ public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath,
+ final int maxThread) {
this.fileReceptionHandler = fileReceptionHandler;
this.watchPath = watchPath;
+ this.maxThread = maxThread;
}
/**
@@ -53,7 +56,7 @@ public class FileClientHandler implements Runnable {
@Override
public void run() {
try {
- fileReceptionHandler.initFileWatcher(watchPath);
+ fileReceptionHandler.initFileWatcher(watchPath, maxThread);
} catch (final IOException ex) {
LOGGER.error("Failed initializing file watcher thread", ex);
}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
index a3f6fab0..0ddd2a92 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
@@ -31,6 +31,8 @@ import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipFile;
@@ -59,7 +61,9 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
try {
final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
ParameterService.get(parameterGroupName);
- final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath());
+ final FileClientHandler fileClientHandler = new FileClientHandler(this,
+ handlerParameters.getWatchPath(),
+ handlerParameters.getMaxThread());
final Thread fileWatcherThread = new Thread(fileClientHandler);
fileWatcherThread.start();
} catch (final Exception ex) {
@@ -89,13 +93,13 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
*
* @param watchPath Path to watch
*/
- public void initFileWatcher(final String watchPath) throws IOException {
+ public void initFileWatcher(final String watchPath, final int maxThread) throws IOException {
try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
final Path dir = Paths.get(watchPath);
dir.register(watcher, ENTRY_CREATE);
LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
- startWatchService(watcher, dir);
- } catch (final InterruptedException ex) {
+ startWatchService(watcher, dir, maxThread);
+ } catch (final Exception ex) {
LOGGER.error("FileWatcher initialization failed", ex);
Thread.currentThread().interrupt();
}
@@ -106,11 +110,16 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
*
* @param watcher the watcher
* @param dir the watch directory
+ * @param maxThread the max thread number
* @throws InterruptedException if it occurs
*/
@SuppressWarnings("unchecked")
- protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException {
+ protected void startWatchService(final WatchService watcher,
+ final Path dir,
+ int maxThread) throws InterruptedException, NullPointerException, IllegalArgumentException {
WatchKey key;
+ ExecutorService pool = Executors.newFixedThreadPool(maxThread);
+
running = true;
while (running) {
key = watcher.take();
@@ -118,12 +127,20 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
for (final WatchEvent<?> event : key.pollEvents()) {
final WatchEvent<Path> ev = (WatchEvent<Path>) event;
final Path fileName = ev.context();
- LOGGER.debug("new CSAR found: {}", fileName);
- DistributionStatisticsManager.updateTotalDistributionCount();
- final String fullFilePath = dir.toString() + File.separator + fileName.toString();
- waitForFileToBeReady(fullFilePath);
- createPolicyInputAndCallHandler(fullFilePath);
- LOGGER.debug("CSAR complete: {}", fileName);
+ pool.execute(new Runnable() {
+ public void run() {
+ LOGGER.debug("new CSAR found: {}", fileName);
+ DistributionStatisticsManager.updateTotalDistributionCount();
+ final String fullFilePath = dir.toString() + File.separator + fileName.toString();
+ try {
+ waitForFileToBeReady(fullFilePath);
+ createPolicyInputAndCallHandler(fullFilePath);
+ LOGGER.debug("CSAR complete: {}", fileName);
+ } catch (InterruptedException e) {
+ LOGGER.error("waitForFileToBeReady interrupted", e);
+ }
+ }
+ });
}
final boolean valid = key.reset();
if (!valid) {
@@ -131,6 +148,7 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
break;
}
}
+ pool.shutdown();
}
/**
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
index 693ff0ec..415e0d94 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
@@ -27,6 +27,7 @@ package org.onap.policy.distribution.reception.handling.file;
public class FileSystemReceptionHandlerConfigurationParameterBuilder {
private String watchPath;
+ private int maxThread = 1;
/**
* Set watchPath to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
@@ -38,6 +39,15 @@ public class FileSystemReceptionHandlerConfigurationParameterBuilder {
return this;
}
+ /**
+ * Set maxThread to this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
+ *
+ * @param maxThread the max thread number in the thread pool
+ */
+ public FileSystemReceptionHandlerConfigurationParameterBuilder setMaxThread(final int maxThread) {
+ this.maxThread = maxThread;
+ return this;
+ }
/**
* Returns the watchPath of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
@@ -47,6 +57,15 @@ public class FileSystemReceptionHandlerConfigurationParameterBuilder {
public String getWatchPath() {
return watchPath;
}
+
+ /**
+ * Returns the maxThread of this {@link FileSystemReceptionHandlerConfigurationParameterBuilder} instance.
+ *
+ * @return the maxThread
+ */
+ public int getMaxThread() {
+ return maxThread;
+ }
}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
index dd50dc78..f9045816 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
@@ -24,6 +24,7 @@ import java.io.File;
import org.onap.policy.common.parameters.GroupValidationResult;
import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.common.utils.validation.ParameterValidationUtils;
import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigurationParameterGroup;
/**
@@ -33,6 +34,7 @@ import org.onap.policy.distribution.reception.parameters.ReceptionHandlerConfigu
public class FileSystemReceptionHandlerConfigurationParameterGroup extends ReceptionHandlerConfigurationParameterGroup {
private String watchPath;
+ private int maxThread;
/**
* The constructor for instantiating {@link FileSystemReceptionHandlerConfigurationParameterGroup} class.
@@ -42,12 +44,17 @@ public class FileSystemReceptionHandlerConfigurationParameterGroup extends Recep
public FileSystemReceptionHandlerConfigurationParameterGroup(
final FileSystemReceptionHandlerConfigurationParameterBuilder builder) {
watchPath = builder.getWatchPath();
+ maxThread = builder.getMaxThread();
}
public String getWatchPath() {
return watchPath;
}
+ public int getMaxThread() {
+ return maxThread;
+ }
+
/**
* {@inheritDoc}.
*/
@@ -55,6 +62,9 @@ public class FileSystemReceptionHandlerConfigurationParameterGroup extends Recep
public GroupValidationResult validate() {
final GroupValidationResult validationResult = new GroupValidationResult(this);
validatePathElement(validationResult, watchPath, "watchPath");
+ if (!ParameterValidationUtils.validateIntParameter(maxThread)) {
+ validationResult.setResult("maxThread", ValidationStatus.INVALID, "must be a positive integer");
+ }
return validationResult;
}