aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/reception-plugins/src
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/reception-plugins/src')
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java73
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java2
-rw-r--r--plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java53
3 files changed, 82 insertions, 46 deletions
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java
index b1a95fac..db0b0b7b 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java
@@ -43,7 +43,7 @@ import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
* Handles reception of inputs from File System which can be used to decode policies.
*/
public class FileSystemReceptionHandler extends AbstractReceptionHandler {
- private boolean running = true;
+ private boolean running = false;
private static final Logger LOGGER = FlexLogger.getLogger(FileSystemReceptionHandler.class);
@Override
@@ -56,6 +56,7 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
} catch (final Exception ex) {
LOGGER.error(ex);
}
+ running = false;
LOGGER.debug("FileSystemReceptionHandler main loop exited...");
}
@@ -65,53 +66,57 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
running = false;
}
+ public boolean isRunning() {
+ return running;
+ }
+
/**
* Main entry point.
*
* @param watchPath Path to watch
*/
@SuppressWarnings("unchecked")
- public void main(String watchPath) {
+ public void main(String watchPath) throws IOException {
try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
final Path dir = Paths.get(watchPath);
-
dir.register(watcher, ENTRY_CREATE);
LOGGER.debug("Watch Service registered for dir: " + dir.getFileName());
- while (running) {
- WatchKey key;
- try {
- key = watcher.take();
- } catch (final InterruptedException ex) {
- LOGGER.debug(ex);
- Thread.currentThread().interrupt();
- return;
- }
+ startMainLoop(watcher, dir);
+ } catch (final InterruptedException ex) {
+ LOGGER.debug(ex);
+ Thread.currentThread().interrupt();
+ }
+ }
- for (final WatchEvent<?> event : key.pollEvents()) {
- final WatchEvent.Kind<?> kind = event.kind();
- final WatchEvent<Path> ev = (WatchEvent<Path>) event;
- final Path fileName = ev.context();
- try {
- LOGGER.debug("new CSAR found: " + kind.name() + ": " + fileName);
- createPolicyInputAndCallHandler(dir.toString() + File.separator + fileName.toString());
- LOGGER.debug("CSAR complete: " + kind.name() + ": " + fileName);
- } catch (final PolicyDecodingException ex) {
- LOGGER.error(ex);
- }
- }
- final boolean valid = key.reset();
- if (!valid) {
- LOGGER.error("Watch key no longer valid!");
- break;
- }
+ @SuppressWarnings("unchecked")
+ protected void startMainLoop(WatchService watcher, Path dir) throws InterruptedException {
+ WatchKey key;
+ running = true;
+ while (running) {
+ key = watcher.take();
+
+ for (final WatchEvent<?> event : key.pollEvents()) {
+ final WatchEvent.Kind<?> kind = event.kind();
+ final WatchEvent<Path> ev = (WatchEvent<Path>) event;
+ final Path fileName = ev.context();
+ LOGGER.debug("new CSAR found: " + fileName);
+ createPolicyInputAndCallHandler(dir.toString() + File.separator + fileName.toString());
+ LOGGER.debug("CSAR complete: " + fileName);
+ }
+ final boolean valid = key.reset();
+ if (!valid) {
+ LOGGER.error("Watch key no longer valid!");
+ break;
}
- } catch (final IOException ex) {
- LOGGER.error(ex);
}
}
- protected void createPolicyInputAndCallHandler(final String fileName) throws PolicyDecodingException {
- final Csar csarObject = new Csar(fileName);
- inputReceived(csarObject);
+ protected void createPolicyInputAndCallHandler(final String fileName) {
+ try {
+ final Csar csarObject = new Csar(fileName);
+ inputReceived(csarObject);
+ } catch (final PolicyDecodingException ex) {
+ LOGGER.error(ex);
+ }
}
}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java
index b017cae6..37a16980 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java
@@ -20,8 +20,6 @@
package org.onap.policy.distribution.reception.handling.sdc;
-import java.util.List;
-
/**
* This class builds an instance of {@link FileSystemReceptionHandlerConfigurationParameterGroup} class.
*
diff --git a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java
index 8edca74e..29eb3f7e 100644
--- a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java
+++ b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java
@@ -29,6 +29,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
@@ -38,7 +39,9 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
import org.onap.policy.common.logging.flexlogger.FlexLogger;
import org.onap.policy.common.logging.flexlogger.Logger;
import org.onap.policy.common.parameters.ParameterService;
@@ -89,9 +92,9 @@ public class TestFileSystemReceptionHandler {
public void teardown() {
ParameterService.deregister(pssdConfigParameters);
}
-
+
@Test
- public final void testInit() {
+ public final void testInit() throws IOException {
final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
Mockito.doNothing().when(sypHandler).main(Mockito.isA(String.class));
sypHandler.initializeReception(pssdConfigParameters.getName());
@@ -99,7 +102,7 @@ public class TestFileSystemReceptionHandler {
}
@Test
- public final void testDestroy() {
+ public final void testDestroy() throws IOException {
try {
final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
Mockito.doNothing().when(sypHandler).main(Mockito.isA(String.class));
@@ -114,23 +117,53 @@ public class TestFileSystemReceptionHandler {
@Test
public void testMain() throws IOException, PolicyDecodingException {
+ final Object lock = new Object();
+ final String watchPath = tempFolder.getRoot().getAbsolutePath().toString();
+
+ class Processed {
+ public boolean processed = false;
+ }
+
+ Processed cond = new Processed();
final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
- Mockito.doNothing().when(sypHandler).createPolicyInputAndCallHandler(Mockito.isA(String.class));
+ Mockito.doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ synchronized (lock) {
+ cond.processed = true;
+ lock.notifyAll();
+ }
+ return null;
+ }
+ }).when(sypHandler).createPolicyInputAndCallHandler(Mockito.isA(String.class));
- final String watchPath = tempFolder.getRoot().getAbsolutePath().toString();
Thread th = new Thread(() -> {
- sypHandler.main(watchPath);
+ try {
+ sypHandler.main(watchPath);
+ } catch (IOException ex) {
+ LOGGER.error(ex);
+ }
});
th.start();
try {
- // yield to main thread
- Thread.sleep(1000);
+ //wait until internal watch service started or counter reached
+ AtomicInteger counter = new AtomicInteger();
+ counter.set(0);
+ synchronized (lock) {
+ while (!sypHandler.isRunning() && counter.getAndIncrement() < 10) {
+ lock.wait(1000);
+ }
+ }
Files.copy(Paths.get("src/test/resources/hpaPolicyHugePage.csar"),
Paths.get(watchPath + File.separator + "hpaPolicyHugePage.csar"));
- // wait enough time
- Thread.sleep(1000);
+ //wait until mock method triggered or counter reached
+ counter.set(0);
+ synchronized (lock) {
+ while (!cond.processed && counter.getAndIncrement() < 10) {
+ lock.wait(1000);
+ }
+ }
sypHandler.destroy();
th.interrupt();
th.join();