summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorramverma <ram.krishna.verma@est.tech>2019-01-22 12:39:39 +0000
committerramverma <ram.krishna.verma@est.tech>2019-01-22 12:39:39 +0000
commit1936220d00ee644774e58ab33de39cca4a006d2a (patch)
tree59f9cd0d337a88424813e9eacb37a94720f15acd
parenteac48bb3c587999376af202ab421275a27279523 (diff)
Fix issues in policy-distribution
1) Creating a new thread for watching directory path for new file. 2) Updating distribution statistics from FileSystemReceptionHandler. Change-Id: Ic539f2cad015f0756407fe910f309a2ea661a764 Issue-ID: POLICY-1437 Signed-off-by: ramverma <ram.krishna.verma@est.tech>
-rw-r--r--packages/policy-distribution-tarball/src/main/resources/etc/s3pConfig.json4
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java61
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java (renamed from plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java)74
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java (renamed from plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java)2
-rw-r--r--plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java (renamed from plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterGroup.java)2
-rw-r--r--plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java (renamed from plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java)31
-rw-r--r--plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java (renamed from plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandlerConfigurationParameterGroup.java)4
7 files changed, 143 insertions, 35 deletions
diff --git a/packages/policy-distribution-tarball/src/main/resources/etc/s3pConfig.json b/packages/policy-distribution-tarball/src/main/resources/etc/s3pConfig.json
index 6124ffb0..09d28069 100644
--- a/packages/policy-distribution-tarball/src/main/resources/etc/s3pConfig.json
+++ b/packages/policy-distribution-tarball/src/main/resources/etc/s3pConfig.json
@@ -9,7 +9,7 @@
"receptionHandlerParameters":{
"S3PReceptionHandler":{
"receptionHandlerType":"S3P",
- "receptionHandlerClassName":"org.onap.policy.distribution.reception.handling.sdc.FileSystemReceptionHandler",
+ "receptionHandlerClassName":"org.onap.policy.distribution.reception.handling.file.FileSystemReceptionHandler",
"receptionHandlerConfigurationName":"fileConfiguration",
"pluginHandlerParameters":{
"policyDecoders":{
@@ -31,7 +31,7 @@
},
"receptionHandlerConfigurationParameters":{
"fileConfiguration":{
- "parameterClassName":"org.onap.policy.distribution.reception.handling.sdc.FileSystemReceptionHandlerConfigurationParameterGroup",
+ "parameterClassName":"org.onap.policy.distribution.reception.handling.file.FileSystemReceptionHandlerConfigurationParameterGroup",
"parameters":{
"watchPath": "/tmp/policy_distribution/csar/"
}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
new file mode 100644
index 00000000..f8e57747
--- /dev/null
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileClientHandler.java
@@ -0,0 +1,61 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distribution.reception.handling.file;
+
+import java.io.IOException;
+
+import org.onap.policy.common.logging.flexlogger.FlexLogger;
+import org.onap.policy.common.logging.flexlogger.Logger;
+
+/**
+ * This class implements Runnable interface for creating new thread which will be used as file watcher.
+ *
+ * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
+ */
+public class FileClientHandler implements Runnable {
+
+ private static final Logger LOGGER = FlexLogger.getLogger(FileClientHandler.class);
+
+ private FileSystemReceptionHandler fileReceptionHandler;
+ private String watchPath;
+
+ /**
+ * Constructs an instance of {@link FileClientHandler} class.
+ *
+ * @param fileReceptionHandler the fileReceptionHandler
+ */
+ public FileClientHandler(final FileSystemReceptionHandler fileReceptionHandler, final String watchPath) {
+ this.fileReceptionHandler = fileReceptionHandler;
+ this.watchPath = watchPath;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void run() {
+ try {
+ fileReceptionHandler.initFileWatcher(watchPath);
+ } catch (final IOException ex) {
+ LOGGER.error(ex);
+ }
+ }
+}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
index 941cdd61..3cb167f3 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandler.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandler.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018 Intel Corp. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +19,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.distribution.reception.handling.sdc;
+package org.onap.policy.distribution.reception.handling.file;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
@@ -30,65 +31,85 @@ import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipFile;
import org.onap.policy.common.logging.flexlogger.FlexLogger;
import org.onap.policy.common.logging.flexlogger.Logger;
-
import org.onap.policy.common.parameters.ParameterService;
import org.onap.policy.distribution.model.Csar;
import org.onap.policy.distribution.reception.decoding.PolicyDecodingException;
import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
+import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager;
/**
* Handles reception of inputs from File System which can be used to decode policies.
*/
public class FileSystemReceptionHandler extends AbstractReceptionHandler {
- private boolean running = false;
+
private static final Logger LOGGER = FlexLogger.getLogger(FileSystemReceptionHandler.class);
+ private boolean running = false;
+ /**
+ * {@inheritDoc}.
+ */
@Override
protected void initializeReception(final String parameterGroupName) {
LOGGER.debug("FileSystemReceptionHandler init...");
try {
final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
- ParameterService.get(parameterGroupName);
- main(handlerParameters.getWatchPath());
+ ParameterService.get(parameterGroupName);
+ final FileClientHandler fileClientHandler = new FileClientHandler(this, handlerParameters.getWatchPath());
+ final Thread fileWatcherThread = new Thread(fileClientHandler);
+ fileWatcherThread.start();
} catch (final Exception ex) {
LOGGER.error(ex);
}
- running = false;
- LOGGER.debug("FileSystemReceptionHandler main loop exited...");
}
+ /**
+ * {@inheritDoc}.
+ */
@Override
public void destroy() {
- // Tear down subscription etc
running = false;
}
+ /**
+ * Method to check the running status of file watcher thread.
+ *
+ * @return the running status
+ */
public boolean isRunning() {
return running;
}
/**
- * Main entry point.
- *
+ * Initialize the file watcher thread.
+ *
* @param watchPath Path to watch
*/
- public void main(String watchPath) throws IOException {
+ public void initFileWatcher(final String watchPath) throws IOException {
try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
final Path dir = Paths.get(watchPath);
dir.register(watcher, ENTRY_CREATE);
LOGGER.debug("Watch Service registered for dir: " + dir.getFileName());
- startMainLoop(watcher, dir);
+ startWatchService(watcher, dir);
} catch (final InterruptedException ex) {
LOGGER.debug(ex);
Thread.currentThread().interrupt();
}
}
+ /**
+ * Method to keep watching the given path for any new file created.
+ *
+ * @param watcher the watcher
+ * @param dir the watch directory
+ * @throws InterruptedException if it occurs
+ */
@SuppressWarnings("unchecked")
- protected void startMainLoop(WatchService watcher, Path dir) throws InterruptedException {
+ protected void startWatchService(final WatchService watcher, final Path dir) throws InterruptedException {
WatchKey key;
running = true;
while (running) {
@@ -98,7 +119,10 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
final WatchEvent<Path> ev = (WatchEvent<Path>) event;
final Path fileName = ev.context();
LOGGER.debug("new CSAR found: " + fileName);
- createPolicyInputAndCallHandler(dir.toString() + File.separator + fileName.toString());
+ DistributionStatisticsManager.updateTotalDistributionCount();
+ final String fullFilePath = dir.toString() + File.separator + fileName.toString();
+ waitForFileToBeReady(fullFilePath);
+ createPolicyInputAndCallHandler(fullFilePath);
LOGGER.debug("CSAR complete: " + fileName);
}
final boolean valid = key.reset();
@@ -109,12 +133,34 @@ public class FileSystemReceptionHandler extends AbstractReceptionHandler {
}
}
+ /**
+ * Method to create policy input & call policy handlers.
+ *
+ * @param fileName the filename
+ */
protected void createPolicyInputAndCallHandler(final String fileName) {
try {
final Csar csarObject = new Csar(fileName);
+ DistributionStatisticsManager.updateTotalDownloadCount();
inputReceived(csarObject);
+ DistributionStatisticsManager.updateDownloadSuccessCount();
+ DistributionStatisticsManager.updateDistributionSuccessCount();
} catch (final PolicyDecodingException ex) {
+ DistributionStatisticsManager.updateDownloadFailureCount();
+ DistributionStatisticsManager.updateDistributionFailureCount();
LOGGER.error(ex);
}
}
+
+ private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException {
+ boolean flag = true;
+ while (flag) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ try (ZipFile zipFile = new ZipFile(fullFilePath)) {
+ flag = false;
+ } catch (final IOException exp) {
+ LOGGER.error("file is not ready for reading, wait for sometime and try again", exp);
+ }
+ }
+ }
}
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
index 37a16980..693ff0ec 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterBuilder.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterBuilder.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.distribution.reception.handling.sdc;
+package org.onap.policy.distribution.reception.handling.file;
/**
* This class builds an instance of {@link FileSystemReceptionHandlerConfigurationParameterGroup} class.
diff --git a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
index 457cd5ea..dd50dc78 100644
--- a/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/sdc/FileSystemReceptionHandlerConfigurationParameterGroup.java
+++ b/plugins/reception-plugins/src/main/java/org/onap/policy/distribution/reception/handling/file/FileSystemReceptionHandlerConfigurationParameterGroup.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.distribution.reception.handling.sdc;
+package org.onap.policy.distribution.reception.handling.file;
import java.io.File;
diff --git a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java
index fc2a2b6a..20922a1b 100644
--- a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandler.java
+++ b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandler.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018 Intel. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +19,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.distribution.reception.handling.sdc;
+package org.onap.policy.distribution.reception.handling.file;
import static org.junit.Assert.fail;
@@ -92,16 +93,16 @@ public class TestFileSystemReceptionHandler {
@Test
public final void testInit() throws IOException {
final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
- Mockito.doNothing().when(sypHandler).main(Mockito.isA(String.class));
+ Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class));
sypHandler.initializeReception(pssdConfigParameters.getName());
- Mockito.verify(sypHandler, Mockito.times(1)).main(Mockito.isA(String.class));
+ Mockito.verify(sypHandler, Mockito.times(1)).initFileWatcher(Mockito.isA(String.class));
}
@Test
public final void testDestroy() throws IOException {
try {
final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
- Mockito.doNothing().when(sypHandler).main(Mockito.isA(String.class));
+ Mockito.doNothing().when(sypHandler).initFileWatcher(Mockito.isA(String.class));
sypHandler.initializeReception(pssdConfigParameters.getName());
sypHandler.destroy();
} catch (final Exception exp) {
@@ -120,11 +121,12 @@ public class TestFileSystemReceptionHandler {
public boolean processed = false;
}
- Processed cond = new Processed();
+ final Processed cond = new Processed();
final FileSystemReceptionHandler sypHandler = Mockito.spy(fileSystemHandler);
Mockito.doAnswer(new Answer<Object>() {
- public Object answer(InvocationOnMock invocation) {
+ @Override
+ public Object answer(final InvocationOnMock invocation) {
synchronized (lock) {
cond.processed = true;
lock.notifyAll();
@@ -133,18 +135,18 @@ public class TestFileSystemReceptionHandler {
}
}).when(sypHandler).createPolicyInputAndCallHandler(Mockito.isA(String.class));
- Thread th = new Thread(() -> {
+ final Thread th = new Thread(() -> {
try {
- sypHandler.main(watchPath);
- } catch (IOException ex) {
+ sypHandler.initFileWatcher(watchPath);
+ } catch (final IOException ex) {
LOGGER.error(ex);
}
});
th.start();
try {
- //wait until internal watch service started or counter reached
- AtomicInteger counter = new AtomicInteger();
+ // wait until internal watch service started or counter reached
+ final AtomicInteger counter = new AtomicInteger();
counter.set(0);
synchronized (lock) {
while (!sypHandler.isRunning() && counter.getAndIncrement() < 10) {
@@ -152,8 +154,8 @@ public class TestFileSystemReceptionHandler {
}
}
Files.copy(Paths.get("src/test/resources/hpaPolicyHugePage.csar"),
- Paths.get(watchPath + File.separator + "hpaPolicyHugePage.csar"));
- //wait until mock method triggered or counter reached
+ Paths.get(watchPath + File.separator + "hpaPolicyHugePage.csar"));
+ // wait until mock method triggered or counter reached
counter.set(0);
synchronized (lock) {
while (!cond.processed && counter.getAndIncrement() < 10) {
@@ -166,8 +168,7 @@ public class TestFileSystemReceptionHandler {
} catch (final InterruptedException ex) {
LOGGER.error(ex);
}
- Mockito.verify(sypHandler, Mockito.times(1))
- .createPolicyInputAndCallHandler(Mockito.isA(String.class));
+ Mockito.verify(sypHandler, Mockito.times(1)).createPolicyInputAndCallHandler(Mockito.isA(String.class));
}
}
diff --git a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandlerConfigurationParameterGroup.java b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java
index 3039560a..92d9443e 100644
--- a/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/sdc/TestFileSystemReceptionHandlerConfigurationParameterGroup.java
+++ b/plugins/reception-plugins/src/test/java/org/onap/policy/distribution/reception/handling/file/TestFileSystemReceptionHandlerConfigurationParameterGroup.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2018 Intel. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +19,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.distribution.reception.handling.sdc;
+package org.onap.policy.distribution.reception.handling.file;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -34,7 +35,6 @@ import java.io.IOException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-
import org.onap.policy.common.parameters.GroupValidationResult;
/**