diff options
Diffstat (limited to 'plugins')
3 files changed, 82 insertions, 46 deletions
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java index b1a95fac..db0b0b7b 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java @@ -43,7 +43,7 @@ import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler; * Handles reception of inputs from File System which can be used to decode policies. */ public class FileSystemReceptionHandler extends AbstractReceptionHandler { - private boolean running = true; + private boolean running = false; private static final Logger LOGGER = FlexLogger.getLogger(FileSystemReceptionHandler.class); @Override @@ -56,6 +56,7 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { } catch (final Exception ex) { LOGGER.error(ex); } + running = false; LOGGER.debug("FileSystemReceptionHandler main loop exited..."); } @@ -65,53 +66,57 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { running = false; } + public boolean isRunning() { + return running; + } + /** * Main entry point. * * @param watchPath Path to watch */ @SuppressWarnings("unchecked") - public void main(String watchPath) { + public void main(String watchPath) throws IOException { try (final WatchService watcher = FileSystems.getDefault().newWatchService()) { final Path dir = Paths.get(watchPath); - dir.register(watcher, ENTRY_CREATE); LOGGER.debug("Watch Service registered for dir: " + dir.getFileName()); - while (running) { - WatchKey key; - try { - key = watcher.take(); - } catch (final InterruptedException ex) { - LOGGER.debug(ex); - Thread.currentThread().interrupt(); - return; - } + startMainLoop(watcher, dir); + } catch (final InterruptedException ex) { + LOGGER.debug(ex); + Thread.currentThread().interrupt(); + } + } - for (final WatchEvent<?> event : key.pollEvents()) { - final WatchEvent.Kind<?> kind = event.kind(); - final WatchEvent<Path> ev = (WatchEvent<Path>) event; - final Path fileName = ev.context(); - try { - LOGGER.debug("new CSAR found: " + kind.name() + ": " + fileName); - createPolicyInputAndCallHandler(dir.toString() + File.separator + fileName.toString()); - LOGGER.debug("CSAR complete: " + kind.name() + ": " + fileName); - } catch (final PolicyDecodingException ex) { - LOGGER.error(ex); - } - } - final boolean valid = key.reset(); - if (!valid) { - LOGGER.error("Watch key no longer valid!"); - break; - } + @SuppressWarnings("unchecked") + protected void startMainLoop(WatchService watcher, Path dir) throws InterruptedException { + WatchKey key; + running = true; + while (running) { + key = watcher.take(); + + for (final WatchEvent<?> event : key.pollEvents()) { + final WatchEvent.Kind<?> kind = event.kind(); + final WatchEvent<Path> ev = (WatchEvent<Path>) event; + final Path fileName = ev.context(); + LOGGER.debug("new CSAR found: " + fileName); + createPolicyInputAndCallHandler(dir.toString() + File.separator + fileName.toString()); + LOGGER.debug("CSAR complete: " + fileName); + } + final boolean valid = key.reset(); + if (!valid) { + LOGGER.error("Watch key no longer valid!"); + break; } - } catch (final IOException ex) { - LOGGER.error(ex); } } - protected void createPolicyInputAndCallHandler(final String fileName) throws PolicyDecodingException { - final Csar csarObject = new Csar(fileName); - inputReceived(csarObject); + protected void createPolicyInputAndCallHandler(final String fileName) { + try { + final Csar csarObject = new Csar(fileName); + inputReceived(csarObject); + } catch (final PolicyDecodingException ex) { + LOGGER.error(ex); + } } } diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java index b017cae6..37a16980 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java @@ -20,8 +20,6 @@ package org.onap.policy.distribution.reception.handling.sdc; -import java.util.List; - /** * This class builds an instance of {@link FileSystemReceptionHandlerConfigurationParameterGroup} class. * diff --git a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java index 8edca74e..29eb3f7e 100644 --- a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java +++ b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; @@ -38,7 +39,9 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import org.onap.policy.common.logging.flexlogger.FlexLogger; import org.onap.policy.common.logging.flexlogger.Logger; import org.onap.policy.common.parameters.ParameterService; @@ -89,9 +92,9 @@ public class TestFileSystemReceptionHandler { public void teardown() { ParameterService.deregister(pssdConfigParameters); } - + @Test - public final void testInit() { + public final void testInit() throws IOException { final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler); Mockito.doNothing().when(sypHandler).main(Mockito.isA(String.class)); sypHandler.initializeReception(pssdConfigParameters.getName()); @@ -99,7 +102,7 @@ public class TestFileSystemReceptionHandler { } @Test - public final void testDestroy() { + public final void testDestroy() throws IOException { try { final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler); Mockito.doNothing().when(sypHandler).main(Mockito.isA(String.class)); @@ -114,23 +117,53 @@ public class TestFileSystemReceptionHandler { @Test public void testMain() throws IOException, PolicyDecodingException { + final Object lock = new Object(); + final String watchPath = tempFolder.getRoot().getAbsolutePath().toString(); + + class Processed { + public boolean processed = false; + } + + Processed cond = new Processed(); final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler); - Mockito.doNothing().when(sypHandler).createPolicyInputAndCallHandler(Mockito.isA(String.class)); + Mockito.doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + synchronized (lock) { + cond.processed = true; + lock.notifyAll(); + } + return null; + } + }).when(sypHandler).createPolicyInputAndCallHandler(Mockito.isA(String.class)); - final String watchPath = tempFolder.getRoot().getAbsolutePath().toString(); Thread th = new Thread(() -> { - sypHandler.main(watchPath); + try { + sypHandler.main(watchPath); + } catch (IOException ex) { + LOGGER.error(ex); + } }); th.start(); try { - // yield to main thread - Thread.sleep(1000); + //wait until internal watch service started or counter reached + AtomicInteger counter = new AtomicInteger(); + counter.set(0); + synchronized (lock) { + while (!sypHandler.isRunning() && counter.getAndIncrement() < 10) { + lock.wait(1000); + } + } Files.copy(Paths.get("src/test/resources/hpaPolicyHugePage.csar"), Paths.get(watchPath + File.separator + "hpaPolicyHugePage.csar")); - // wait enough time - Thread.sleep(1000); + //wait until mock method triggered or counter reached + counter.set(0); + synchronized (lock) { + while (!cond.processed && counter.getAndIncrement() < 10) { + lock.wait(1000); + } + } sypHandler.destroy(); th.interrupt(); th.join(); |