diff options
Diffstat (limited to 'datafile-app-server/src')
4 files changed, 154 insertions, 18 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 12f303ef..bc21f96c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -21,7 +21,9 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; + import javax.annotation.PostConstruct; + import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -29,6 +31,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; + import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -39,8 +42,9 @@ import reactor.core.publisher.Mono; @EnableScheduling public class SchedulerConfig { - private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15; - private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; + private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15); + private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5); + private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1); private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); private final TaskScheduler taskScheduler; @@ -77,11 +81,13 @@ public class SchedulerConfig { @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler - .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), - Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); + scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), + SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask, - Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS))); + SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); + scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), + SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); + return true; } else { return false; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java new file mode 100644 index 00000000..2cb84112 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ +package org.onap.dcaegen2.collectors.datafile.service; + +import java.nio.file.Path; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * A cache of all files that already has been published. Key is the local file path and the value is + * a time stamp, when the key was last used. + */ +public class PublishedFileCache { + private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>()); + + public Instant put(Path path) { + return publishedFiles.put(path, Instant.now()); + } + + public void remove(Path localFileName) { + publishedFiles.remove(localFileName); + } + + public void purge(Instant now) { + for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) { + Map.Entry<Path, Instant> pair = it.next(); + if (isCachedPublishedFileOutdated(now, pair.getValue())) { + it.remove(); + } + } + } + + public int size() { + return publishedFiles.size(); + } + + private boolean isCachedPublishedFileOutdated(Instant now, Instant then) { + final int timeToKeepInfoInSeconds = 60 * 60 * 24; + return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds; + } + + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 37b7a559..50f5431a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -20,11 +20,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -34,6 +32,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -55,8 +54,9 @@ public class ScheduledTasks { /** Data needed for fetching of files from one PNF */ private class FileCollectionData { final FileData fileData; - final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES - // event + final FileCollector collectorTask; // Same object, ftp session etc. can be used for each + // file in one VES + // event final MessageMetaData metaData; FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) { @@ -69,7 +69,8 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; private final AtomicInteger taskCounter = new AtomicInteger(); - private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>()); + + PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); /** * Constructor for task registration in Datafile Workflow. @@ -84,7 +85,7 @@ public class ScheduledTasks { } /** - * Main function for scheduling Datafile Workflow. + * Main function for scheduling for the file collection Workflow. */ public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); @@ -105,6 +106,13 @@ public class ScheduledTasks { //@formatter:on } + /** + * called in regular intervals to remove out-dated cached information + */ + public void purgeCachedInformation(Instant now) { + alreadyPublishedFiles.purge(now); + } + private void onComplete() { logger.info("Datafile tasks have been completed"); } @@ -121,15 +129,15 @@ public class ScheduledTasks { List<FileCollectionData> fileCollects = new ArrayList<>(); for (FileData fileData : availableFiles.files()) { - FileCollector task = new FileCollector(applicationConfiguration, - new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData())); + FileCollector task = new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()), + new SftpClient(fileData.fileServerData())); fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData())); } return Flux.fromIterable(fileCollects); } private boolean shouldBePublished(FileCollectionData task) { - return alreadyPublishedFiles.add(task.fileData.getLocalFileName()); + return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null; } private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) { @@ -158,7 +166,6 @@ public class ScheduledTasks { return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout) .onErrorResume(exception -> handlePublishFailure(model, exception)); - } private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { @@ -179,7 +186,7 @@ public class ScheduledTasks { final DMaaPMessageConsumerTask messageConsumerTask = new DMaaPMessageConsumerTask(this.applicationConfiguration); - return messageConsumerTask.execute() + return messageConsumerTask.execute() // .onErrorResume(this::handleConsumeMessageFailure); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java new file mode 100644 index 00000000..7b38ee42 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class PublishedFileCacheTest { + + private static PublishedFileCache testObject; + + @BeforeAll + public static void setUp() { + testObject = new PublishedFileCache(); + } + + @Test + public void purgeFiles_timeNotExpired() { + Assertions.assertNull(testObject.put(Paths.get("A"))); + Assertions.assertNotNull(testObject.put(Paths.get("A"))); + testObject.put(Paths.get("B")); + + testObject.purge(Instant.now()); + Assertions.assertEquals(2, testObject.size()); + } + + @Test + public void purgeFiles_timeExpired() { + testObject.put(Paths.get("A")); + testObject.put(Paths.get("B")); + testObject.put(Paths.get("C")); + + testObject.purge(Instant.MAX); + Assertions.assertEquals(0, testObject.size()); + } + + @Test + public void purgeFiles_remove() { + Path path = Paths.get("A"); + testObject.put(path); + Assertions.assertEquals(1, testObject.size()); + testObject.remove(path); + Assertions.assertEquals(0, testObject.size()); + } +} |