summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java18
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java59
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java31
3 files changed, 90 insertions, 18 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 12f303ef..bc21f96c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -21,7 +21,9 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -29,6 +31,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -39,8 +42,9 @@ import reactor.core.publisher.Mono;
@EnableScheduling
public class SchedulerConfig {
- private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15;
- private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
+ private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
+ private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
+ private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private final TaskScheduler taskScheduler;
@@ -77,11 +81,13 @@ public class SchedulerConfig {
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler
- .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
- Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
+ SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask,
- Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)));
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
+ SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
+
return true;
} else {
return false;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
new file mode 100644
index 00000000..2cb84112
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A cache of all files that already has been published. Key is the local file path and the value is
+ * a time stamp, when the key was last used.
+ */
+public class PublishedFileCache {
+ private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
+
+ public Instant put(Path path) {
+ return publishedFiles.put(path, Instant.now());
+ }
+
+ public void remove(Path localFileName) {
+ publishedFiles.remove(localFileName);
+ }
+
+ public void purge(Instant now) {
+ for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<Path, Instant> pair = it.next();
+ if (isCachedPublishedFileOutdated(now, pair.getValue())) {
+ it.remove();
+ }
+ }
+ }
+
+ public int size() {
+ return publishedFiles.size();
+ }
+
+ private boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
+ final int timeToKeepInfoInSeconds = 60 * 60 * 24;
+ return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
+ }
+
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 37b7a559..50f5431a 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -20,11 +20,9 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -34,6 +32,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -55,8 +54,9 @@ public class ScheduledTasks {
/** Data needed for fetching of files from one PNF */
private class FileCollectionData {
final FileData fileData;
- final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES
- // event
+ final FileCollector collectorTask; // Same object, ftp session etc. can be used for each
+ // file in one VES
+ // event
final MessageMetaData metaData;
FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
@@ -69,7 +69,8 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
private final AtomicInteger taskCounter = new AtomicInteger();
- private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>());
+
+ PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
* Constructor for task registration in Datafile Workflow.
@@ -84,7 +85,7 @@ public class ScheduledTasks {
}
/**
- * Main function for scheduling Datafile Workflow.
+ * Main function for scheduling for the file collection Workflow.
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
@@ -105,6 +106,13 @@ public class ScheduledTasks {
//@formatter:on
}
+ /**
+ * called in regular intervals to remove out-dated cached information
+ */
+ public void purgeCachedInformation(Instant now) {
+ alreadyPublishedFiles.purge(now);
+ }
+
private void onComplete() {
logger.info("Datafile tasks have been completed");
}
@@ -121,15 +129,15 @@ public class ScheduledTasks {
List<FileCollectionData> fileCollects = new ArrayList<>();
for (FileData fileData : availableFiles.files()) {
- FileCollector task = new FileCollector(applicationConfiguration,
- new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData()));
+ FileCollector task = new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
+ new SftpClient(fileData.fileServerData()));
fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
}
return Flux.fromIterable(fileCollects);
}
private boolean shouldBePublished(FileCollectionData task) {
- return alreadyPublishedFiles.add(task.fileData.getLocalFileName());
+ return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
}
private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
@@ -158,7 +166,6 @@ public class ScheduledTasks {
return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
.onErrorResume(exception -> handlePublishFailure(model, exception));
-
}
private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
@@ -179,7 +186,7 @@ public class ScheduledTasks {
final DMaaPMessageConsumerTask messageConsumerTask =
new DMaaPMessageConsumerTask(this.applicationConfiguration);
- return messageConsumerTask.execute()
+ return messageConsumerTask.execute() //
.onErrorResume(this::handleConsumeMessageFailure);
}