diff options
author | ramverma <ram.krishna.verma@est.tech> | 2019-01-22 12:39:39 +0000 |
---|---|---|
committer | ramverma <ram.krishna.verma@est.tech> | 2019-01-22 12:39:39 +0000 |
commit | 1936220d00ee644774e58ab33de39cca4a006d2a (patch) | |
tree | 59f9cd0d337a88424813e9eacb37a94720f15acd /plugins/reception-plugins/src | |
parent | eac48bb3c587999376af202ab421275a27279523 (diff) |
Fix issues in policy-distribution
1) Creating a new thread for watching directory path for new file.
2) Updating distribution statistics from FileSystemReceptionHandler.
Change-Id: Ic539f2cad015f0756407fe910f309a2ea661a764
Issue-ID: POLICY-1437
Signed-off-by: ramverma <ram.krishna.verma@est.tech>
Diffstat (limited to 'plugins/reception-plugins/src')
-rw-r--r-- | plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java | 61 | ||||
-rw-r--r-- | plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java (renamed from plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java) | 74 | ||||
-rw-r--r-- | plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java (renamed from plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java) | 2 | ||||
-rw-r--r-- | plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java (renamed from plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterGroup.java) | 2 | ||||
-rw-r--r-- | plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java (renamed from plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java) | 31 | ||||
-rw-r--r-- | plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java (renamed from plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandlerConfigurationParameterGroup.java) | 4 |
6 files changed, 141 insertions, 33 deletions
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java new file mode 100644 index 00000000..f8e57747 --- /dev/null +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java @@ -0,0 +1,61 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.distribution.reception.handling.file; + +import java.io.IOException; + +import org.onap.policy.common.logging.flexlogger.FlexLogger; +import org.onap.policy.common.logging.flexlogger.Logger; + +/** + * This class implements Runnable interface for creating new thread which will be used as file watcher. + * + * @author Ram Krishna Verma (ram.krishna.verma@est.tech) + */ +public class FileClientHandler implements Runnable { + + private static final Logger LOGGER = FlexLogger.getLogger(FileClientHandler.class); + + private FileSystemReceptionHandler fileReceptionHandler; + private String watchPath; + + /** + * Constructs an instance of {@link FileClientHandler} class. + * + * @param fileReceptionHandler the fileReceptionHandler + */ + public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath) { + this.fileReceptionHandler = fileReceptionHandler; + this.watchPath = watchPath; + } + + /** + * {@inheritDoc}. + */ + @Override + public void run() { + try { + fileReceptionHandler.initFileWatcher(watchPath); + } catch (final IOException ex) { + LOGGER.error(ex); + } + } +} diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java index 941cdd61..3cb167f3 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2018 Intel Corp. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.distribution.reception.handling.sdc; +package org.onap.policy.distribution.reception.handling.file; import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; @@ -30,65 +31,85 @@ import java.nio.file.Paths; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.util.concurrent.TimeUnit; +import java.util.zip.ZipFile; import org.onap.policy.common.logging.flexlogger.FlexLogger; import org.onap.policy.common.logging.flexlogger.Logger; - import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.distribution.model.Csar; import org.onap.policy.distribution.reception.decoding.PolicyDecodingException; import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler; +import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager; /** * Handles reception of inputs from File System which can be used to decode policies. */ public class FileSystemReceptionHandler extends AbstractReceptionHandler { - private boolean running = false; + private static final Logger LOGGER = FlexLogger.getLogger(FileSystemReceptionHandler.class); + private boolean running = false; + /** + * {@inheritDoc}. + */ @Override protected void initializeReception(final String parameterGroupName) { LOGGER.debug("FileSystemReceptionHandler init..."); try { final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters = - ParameterService.get(parameterGroupName); - main(handlerParameters.getWatchPath()); + ParameterService.get(parameterGroupName); + final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath()); + final Thread fileWatcherThread = new Thread(fileClientHandler); + fileWatcherThread.start(); } catch (final Exception ex) { LOGGER.error(ex); } - running = false; - LOGGER.debug("FileSystemReceptionHandler main loop exited..."); } + /** + * {@inheritDoc}. + */ @Override public void destroy() { - // Tear down subscription etc running = false; } + /** + * Method to check the running status of file watcher thread. + * + * @return the running status + */ public boolean isRunning() { return running; } /** - * Main entry point. - * + * Initialize the file watcher thread. + * * @param watchPath Path to watch */ - public void main(String watchPath) throws IOException { + public void initFileWatcher(final String watchPath) throws IOException { try (final WatchService watcher = FileSystems.getDefault().newWatchService()) { final Path dir = Paths.get(watchPath); dir.register(watcher, ENTRY_CREATE); LOGGER.debug("Watch Service registered for dir: " + dir.getFileName()); - startMainLoop(watcher, dir); + startWatchService(watcher, dir); } catch (final InterruptedException ex) { LOGGER.debug(ex); Thread.currentThread().interrupt(); } } + /** + * Method to keep watching the given path for any new file created. + * + * @param watcher the watcher + * @param dir the watch directory + * @throws InterruptedException if it occurs + */ @SuppressWarnings("unchecked") - protected void startMainLoop(WatchService watcher, Path dir) throws InterruptedException { + protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException { WatchKey key; running = true; while (running) { @@ -98,7 +119,10 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { final WatchEvent<Path> ev = (WatchEvent<Path>) event; final Path fileName = ev.context(); LOGGER.debug("new CSAR found: " + fileName); - createPolicyInputAndCallHandler(dir.toString() + File.separator + fileName.toString()); + DistributionStatisticsManager.updateTotalDistributionCount(); + final String fullFilePath = dir.toString() + File.separator + fileName.toString(); + waitForFileToBeReady(fullFilePath); + createPolicyInputAndCallHandler(fullFilePath); LOGGER.debug("CSAR complete: " + fileName); } final boolean valid = key.reset(); @@ -109,12 +133,34 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler { } } + /** + * Method to create policy input & call policy handlers. + * + * @param fileName the filename + */ protected void createPolicyInputAndCallHandler(final String fileName) { try { final Csar csarObject = new Csar(fileName); + DistributionStatisticsManager.updateTotalDownloadCount(); inputReceived(csarObject); + DistributionStatisticsManager.updateDownloadSuccessCount(); + DistributionStatisticsManager.updateDistributionSuccessCount(); } catch (final PolicyDecodingException ex) { + DistributionStatisticsManager.updateDownloadFailureCount(); + DistributionStatisticsManager.updateDistributionFailureCount(); LOGGER.error(ex); } } + + private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException { + boolean flag = true; + while (flag) { + TimeUnit.MILLISECONDS.sleep(100); + try (ZipFile zipFile = new ZipFile(fullFilePath)) { + flag = false; + } catch (final IOException exp) { + LOGGER.error("file is not ready for reading, wait for sometime and try again", exp); + } + } + } } diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java index 37a16980..693ff0ec 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.distribution.reception.handling.sdc; +package org.onap.policy.distribution.reception.handling.file; /** * This class builds an instance of {@link FileSystemReceptionHandlerConfigurationParameterGroup} class. diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java index 457cd5ea..dd50dc78 100644 --- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterGroup.java +++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.distribution.reception.handling.sdc; +package org.onap.policy.distribution.reception.handling.file; import java.io.File; diff --git a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java index fc2a2b6a..20922a1b 100644 --- a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java +++ b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2018 Intel. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.distribution.reception.handling.sdc; +package org.onap.policy.distribution.reception.handling.file; import static org.junit.Assert.fail; @@ -92,16 +93,16 @@ public class TestFileSystemReceptionHandler { @Test public final void testInit() throws IOException { final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler); - Mockito.doNothing().when(sypHandler).main(Mockito.isA(String.class)); + Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class)); sypHandler.initializeReception(pssdConfigParameters.getName()); - Mockito.verify(sypHandler, Mockito.times(1)).main(Mockito.isA(String.class)); + Mockito.verify(sypHandler, Mockito.times(1)).initFileWatcher(Mockito.isA(String.class)); } @Test public final void testDestroy() throws IOException { try { final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler); - Mockito.doNothing().when(sypHandler).main(Mockito.isA(String.class)); + Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class)); sypHandler.initializeReception(pssdConfigParameters.getName()); sypHandler.destroy(); } catch (final Exception exp) { @@ -120,11 +121,12 @@ public class TestFileSystemReceptionHandler { public boolean processed = false; } - Processed cond = new Processed(); + final Processed cond = new Processed(); final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler); Mockito.doAnswer(new Answer<Object>() { - public Object answer(InvocationOnMock invocation) { + @Override + public Object answer(final InvocationOnMock invocation) { synchronized (lock) { cond.processed = true; lock.notifyAll(); @@ -133,18 +135,18 @@ public class TestFileSystemReceptionHandler { } }).when(sypHandler).createPolicyInputAndCallHandler(Mockito.isA(String.class)); - Thread th = new Thread(() -> { + final Thread th = new Thread(() -> { try { - sypHandler.main(watchPath); - } catch (IOException ex) { + sypHandler.initFileWatcher(watchPath); + } catch (final IOException ex) { LOGGER.error(ex); } }); th.start(); try { - //wait until internal watch service started or counter reached - AtomicInteger counter = new AtomicInteger(); + // wait until internal watch service started or counter reached + final AtomicInteger counter = new AtomicInteger(); counter.set(0); synchronized (lock) { while (!sypHandler.isRunning() && counter.getAndIncrement() < 10) { @@ -152,8 +154,8 @@ public class TestFileSystemReceptionHandler { } } Files.copy(Paths.get("src/test/resources/hpaPolicyHugePage.csar"), - Paths.get(watchPath + File.separator + "hpaPolicyHugePage.csar")); - //wait until mock method triggered or counter reached + Paths.get(watchPath + File.separator + "hpaPolicyHugePage.csar")); + // wait until mock method triggered or counter reached counter.set(0); synchronized (lock) { while (!cond.processed && counter.getAndIncrement() < 10) { @@ -166,8 +168,7 @@ public class TestFileSystemReceptionHandler { } catch (final InterruptedException ex) { LOGGER.error(ex); } - Mockito.verify(sypHandler, Mockito.times(1)) - .createPolicyInputAndCallHandler(Mockito.isA(String.class)); + Mockito.verify(sypHandler, Mockito.times(1)).createPolicyInputAndCallHandler(Mockito.isA(String.class)); } } diff --git a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java index 3039560a..92d9443e 100644 --- a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandlerConfigurationParameterGroup.java +++ b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2018 Intel. All rights reserved. + * Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +19,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.distribution.reception.handling.sdc; +package org.onap.policy.distribution.reception.handling.file; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -34,7 +35,6 @@ import java.io.IOException; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; - import org.onap.policy.common.parameters.GroupValidationResult; /** |