aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main
diff options
context:
space:
mode:
authormaximesson <maxime.bonneau@est.tech>2019-03-21 15:58:55 +0000
committermaximesson <maxime.bonneau@est.tech>2019-03-21 15:58:55 +0000
commit4bd281390ed24b278846775c1157f82db81fddbe (patch)
tree1ffaf2384e830e9659e379aab0c833732924ccce /datafile-app-server/src/main
parent6870154043d73d527cc42aca7ade7e49aa961476 (diff)
Add check to DataRouter if file has been published
For each file in the FileReady message that DFC does not know if it has been published yet, it should ask DataRouter if it has been published already to avoid downloading and publishing a file more than once. Change-Id: I18117a6e968ec929aa255052a4c44f890a8ed39d Issue-ID: DCAEGEN2-1256 Signed-off-by: maximesson <maxime.bonneau@est.tech>
Diffstat (limited to 'datafile-app-server/src/main')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java58
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java19
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java126
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java40
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java119
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java38
8 files changed, 347 insertions, 99 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 82c390f7..f89a1012 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -42,6 +42,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.stereotype.Component;
/**
+ * Holds all configuration for the DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@@ -79,8 +81,10 @@ public class AppConfig {
return ftpesConfiguration;
}
+ /**
+ * Reads the configuration from file.
+ */
public void loadConfigurationFromFile() {
-
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
JsonParser parser = new JsonParser();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index b4dc6353..52723330 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -25,7 +26,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.apache.commons.lang3.StringUtils;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
@@ -40,11 +43,15 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
/**
+ * Api for starting and stopping DFC.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@Configuration
@EnableScheduling
@@ -63,11 +70,18 @@ public class SchedulerConfig {
private final ScheduledTasks scheduledTask;
private final CloudConfiguration cloudConfiguration;
+ /**
+ * Constructor.
+ *
+ * @param taskScheduler The scheduler used to schedule the tasks.
+ * @param scheduledTasks The scheduler that will actually handle the tasks.
+ * @param cloudConfiguration The DFC configuration.
+ */
@Autowired
- public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
- CloudConfiguration cloudConfiguration) {
+ public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
+ CloudConfiguration cloudConfiguration) {
this.taskScheduler = taskScheduler;
- this.scheduledTask = scheduledTask;
+ this.scheduledTask = scheduledTasks;
this.cloudConfiguration = cloudConfiguration;
}
@@ -84,7 +98,7 @@ public class SchedulerConfig {
logger.info(EXIT, "Stopped Datafile workflow");
MDC.clear();
return Mono.defer(() -> Mono
- .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+ .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
}
/**
@@ -106,12 +120,14 @@ public class SchedulerConfig {
contextMap = MDC.getCopyOfContextMap();
logger.info(ENTRY, "Start scheduling Datafile workflow");
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(),
- SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
- SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
- scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
- SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap),
+ Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
+ scheduledFutureList.add(
+ taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList
+ .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
+ SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
return true;
} else {
@@ -119,4 +135,4 @@ public class SchedulerConfig {
}
}
-} \ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index a8f79ea1..af45cc99 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -88,11 +88,17 @@ public class JsonMessageParser {
}
}
+ /**
+ * Parses the Json message and returns a stream of messages.
+ *
+ * @param rawMessage the Json message to parse.
+ * @return a <code>Flux</code> containing messages.
+ */
public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
JsonParser jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: element.isJsonObject() ? Optional.of((JsonObject) element)
@@ -136,13 +142,11 @@ public class JsonMessageParser {
List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
if (!allFileDataFromJson.isEmpty()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
- // @formatter:off
- return Mono.just(ImmutableFileReadyMessage.builder()
- .pnfName(messageMetaData.sourceName())
- .messageMetaData(messageMetaData)
- .files(allFileDataFromJson)
+ return Mono.just(ImmutableFileReadyMessage.builder() //
+ .pnfName(messageMetaData.sourceName()) //
+ .messageMetaData(messageMetaData) //
+ .files(allFileDataFromJson) //
.build());
- // @formatter:on
} else {
return Mono.empty();
}
@@ -168,18 +172,16 @@ public class JsonMessageParser {
// version.
getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
- // @formatter:off
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
- .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
- .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
- .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues))
- .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues))
- .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues))
- .changeIdentifier(changeIdentifier)
- .changeType(changeType)
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) //
+ .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) //
+ .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) //
+ .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) //
+ .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) //
+ .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) //
+ .changeIdentifier(changeIdentifier) //
+ .changeType(changeType) //
.build();
- // @formatter:on
if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
return Optional.of(messageMetaData);
} else {
@@ -231,16 +233,14 @@ public class JsonMessageParser {
logger.error("Unable to collect file from xNF.", e);
return Optional.empty();
}
- // @formatter:off
- FileData fileData = ImmutableFileData.builder()
- .name(getValueFromJson(fileInfo, NAME, missingValues))
- .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
- .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
- .location(location)
- .scheme(scheme)
- .compression(getValueFromJson(data, COMPRESSION, missingValues))
+ FileData fileData = ImmutableFileData.builder() //
+ .name(getValueFromJson(fileInfo, NAME, missingValues)) //
+ .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) //
+ .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) //
+ .location(location) //
+ .scheme(scheme) //
+ .compression(getValueFromJson(data, COMPRESSION, missingValues)) //
.build();
- // @formatter:on
if (missingValues.isEmpty()) {
return Optional.of(fileData);
}
@@ -250,8 +250,8 @@ public class JsonMessageParser {
}
/**
- * Gets data from the event name, defined as:
- * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+ * Gets data from the event name.
+ * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
* Noti_RnNode-Ericsson_FileReady
*
* @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 2cb84112..e2dca182 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -13,6 +13,7 @@
* the License.
* ============LICENSE_END========================================================================
*/
+
package org.onap.dcaegen2.collectors.datafile.service;
import java.nio.file.Path;
@@ -29,14 +30,30 @@ import java.util.Map;
public class PublishedFileCache {
private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
+ /**
+ * Adds a file to the cache.
+ *
+ * @param path the name of the file to add.
+ * @return <code>null</code> if the file is not already in the cache.
+ */
public Instant put(Path path) {
return publishedFiles.put(path, Instant.now());
}
+ /**
+ * Removes a file from the cache.
+ *
+ * @param localFileName name of the file to remove.
+ */
public void remove(Path localFileName) {
publishedFiles.remove(localFileName);
}
+ /**
+ * Removes files 24 hours older than the given instant.
+ *
+ * @param now the instant will determine which files that will be purged.
+ */
public void purge(Instant now) {
for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) {
Map.Entry<Path, Instant> pair = it.next();
@@ -46,7 +63,7 @@ public class PublishedFileCache {
}
}
- public int size() {
+ int size() {
return publishedFiles.size();
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 57edc364..0fef9ab4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,24 +1,46 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
@@ -26,17 +48,30 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+
import reactor.core.publisher.Mono;
/**
+ * Publishes a file to the DataRouter.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class DataRouterPublisher {
+ private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+ private static final String CONTENT_TYPE = "application/octet-stream";
+ private static final String NAME_JSON_TAG = "name";
+ private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
+ private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -44,25 +79,70 @@ public class DataRouterPublisher {
/**
- * Publish one file
- * @param consumerDmaapModel information about the file to publish
- * @param maxNumberOfRetries the maximal number of retries if the publishing fails
- * @param firstBackoffTimeout the time to delay the first retry
+ * Publish one file.
+ *
+ * @param model information about the file to publish
+ * @param numRetries the maximal number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
* @return the HTTP response status as a string
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Method called with arg {}", model);
- DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
+ dmaapProducerReactiveHttpClient = resolveClient();
- //@formatter:off
return Mono.just(model)
.cache()
- .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
+ .flatMap(m -> publishFile(m, contextMap)) //
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
.retryBackoff(numRetries, firstBackoff);
- //@formatter:on
+ }
+
+ private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) {
+ logger.trace("Entering publishFile with {}", consumerDmaapModel);
+ try {
+ HttpPut put = new HttpPut();
+ String requestId = MDC.get(REQUEST_ID);
+ put.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ put.addHeader(X_INVOCATION_ID, invocationId);
+
+ prepareHead(consumerDmaapModel, put);
+ prepareBody(consumerDmaapModel, put);
+ dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+
+ HttpResponse response =
+ dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
+ logger.trace(response.toString());
+ return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ } catch (Exception e) {
+ logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ return Mono.error(e);
+ }
+ }
+
+ private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+ put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+ metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
+ put.addHeader(X_DMAAP_DR_META, metaData.toString());
+ put.setURI(getPublishUri(model.getInternalLocation().getFileName().toString()));
+ }
+
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ Path fileLocation = model.getInternalLocation();
+ try (InputStream fileInputStream = createInputStream(fileLocation)) {
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ }
+ }
+
+ private URI getPublishUri(String fileName) {
+ return dmaapProducerReactiveHttpClient.getBaseUri() //
+ .pathSegment(PUBLISH_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .pathSegment(fileName).build();
}
private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
@@ -77,6 +157,10 @@ public class DataRouterPublisher {
}
}
+ InputStream createInputStream(Path filePath) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(filePath);
+ return realResource.getInputStream();
+ }
DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index af4670e3..fb27a579 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -20,6 +20,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import reactor.core.publisher.Mono;
/**
@@ -52,12 +54,10 @@ public class FileCollector {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
- //@formatter:off
- return Mono.just(fileData)
- .cache()
- .flatMap(fd -> collectFile(fileData, metaData, contextMap))
- .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
- //@formatter:on
+ return Mono.just(fileData) //
+ .cache() //
+ .flatMap(fd -> collectFile(fileData, metaData, contextMap)) //
+ .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
}
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
@@ -92,22 +92,20 @@ public class FileCollector {
private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
String location = fileData.location();
- // @formatter:off
- return ImmutableConsumerDmaapModel.builder()
- .productName(metaData.productName())
- .vendorName(metaData.vendorName())
- .lastEpochMicrosec(metaData.lastEpochMicrosec())
- .sourceName(metaData.sourceName())
- .startEpochMicrosec(metaData.startEpochMicrosec())
- .timeZoneOffset(metaData.timeZoneOffset())
- .name(fileData.name())
- .location(location)
- .internalLocation(localFile.toString())
- .compression(fileData.compression())
- .fileFormatType(fileData.fileFormatType())
- .fileFormatVersion(fileData.fileFormatVersion())
+ return ImmutableConsumerDmaapModel.builder() //
+ .productName(metaData.productName()) //
+ .vendorName(metaData.vendorName()) //
+ .lastEpochMicrosec(metaData.lastEpochMicrosec()) //
+ .sourceName(metaData.sourceName()) //
+ .startEpochMicrosec(metaData.startEpochMicrosec()) //
+ .timeZoneOffset(metaData.timeZoneOffset()) //
+ .name(fileData.name()) //
+ .location(location) //
+ .internalLocation(localFile) //
+ .compression(fileData.compression()) //
+ .fileFormatType(fileData.fileFormatType()) //
+ .fileFormatVersion(fileData.fileFormatVersion()) //
.build();
- // @formatter:on
}
SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
new file mode 100644
index 00000000..0729caa0
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -0,0 +1,119 @@
+/*-
+* ============LICENSE_START=======================================================
+* Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Bean used to check with DataRouter if a file has been published.
+ *
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedChecker {
+ private static final String FEEDLOG_TOPIC = "feedlog";
+ private static final String DEFAULT_FEED_ID = "1";
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private AppConfig appConfig;
+
+ /**
+ * Constructor.
+ *
+ * @param appConfig The DFC configuration.
+ */
+ public PublishedChecker(AppConfig appConfig) {
+ this.appConfig = appConfig;
+ }
+
+ /**
+ * Checks with DataRouter if the given file has been published already.
+ *
+ * @param fileName the name of the file used when it is published.
+ *
+ * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+ */
+ public boolean execute(String fileName, Map<String, String> contextMap) {
+ MdcVariables.setMdcContextMap(contextMap);
+ DmaapProducerReactiveHttpClient producerClient = resolveClient();
+
+ HttpGet getRequest = new HttpGet();
+ String requestId = MDC.get(REQUEST_ID);
+ getRequest.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ getRequest.addHeader(X_INVOCATION_ID, invocationId);
+ getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
+ producerClient.addUserCredentialsToHead(getRequest);
+
+ try {
+ HttpResponse response =
+ producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
+
+ logger.trace(response.toString());
+ int status = response.getStatusLine().getStatusCode();
+ HttpEntity entity = response.getEntity();
+ InputStream content = entity.getContent();
+ String body = IOUtils.toString(content);
+ return HttpStatus.SC_OK == status && !"[]".equals(body);
+ } catch (Exception e) {
+ logger.warn("Unable to check if file has been published.", e);
+ return false;
+ }
+ }
+
+ private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) {
+ return producerClient.getBaseUri() //
+ .pathSegment(FEEDLOG_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .queryParam("type", "pub") //
+ .queryParam("filename", fileName) //
+ .build();
+ }
+
+ protected DmaapPublisherConfiguration resolveConfiguration() {
+ return appConfig.getDmaapPublisherConfiguration();
+ }
+
+ protected DmaapProducerReactiveHttpClient resolveClient() {
+ return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+ }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 28963377..d41e5c25 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,4 +1,4 @@
-/*
+/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
@@ -18,13 +18,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
@@ -37,14 +37,15 @@ import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * This implements the main flow of the data file collector. Fetch file ready events from the
- * message router, fetch new files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
+ * files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
@@ -52,7 +53,7 @@ public class ScheduledTasks {
private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
- /** Data needed for fetching of one file */
+ /** Data needed for fetching of one file. */
private class FileCollectionData {
final FileData fileData;
final MessageMetaData metaData;
@@ -64,6 +65,7 @@ public class ScheduledTasks {
}
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
private final AppConfig applicationConfiguration;
private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
private final Scheduler scheduler =
@@ -96,17 +98,17 @@ public class ScheduledTasks {
.parallel(getParallelism()) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.flatMap(this::createFileCollectionTask) //
- .filter(this::shouldBePublished) //
+ .filter(fileData -> shouldBePublished(fileData, contextMap)) //
.doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
.flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
.flatMap(model -> publishToDataRouter(model, contextMap)) //
- .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) //
+ .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
.doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
.sequential();
}
/**
- * called in regular intervals to remove out-dated cached information
+ * called in regular intervals to remove out-dated cached information.
*/
public void purgeCachedInformation(Instant now) {
alreadyPublishedFiles.purge(now);
@@ -144,8 +146,13 @@ public class ScheduledTasks {
return Flux.fromIterable(fileCollects);
}
- private boolean shouldBePublished(FileCollectionData task) {
- return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
+ private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) {
+ boolean result = false;
+ Path localFileName = task.fileData.getLocalFileName();
+ if (alreadyPublishedFiles.put(localFileName) == null) {
+ result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap);
+ }
+ return result;
}
private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
@@ -156,7 +163,7 @@ public class ScheduledTasks {
MdcVariables.setMdcContextMap(contextMap);
return createFileCollector()
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
- contextMap)
+ contextMap)
.onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
}
@@ -174,10 +181,9 @@ public class ScheduledTasks {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
- DataRouterPublisher publisherTask = createDataRouterPublisher();
MdcVariables.setMdcContextMap(contextMap);
- return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
+ return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
.onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
}
@@ -185,7 +191,7 @@ public class ScheduledTasks {
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
- Path internalFileName = Paths.get(model.getInternalLocation());
+ Path internalFileName = model.getInternalLocation();
deleteFile(internalFileName, contextMap);
alreadyPublishedFiles.remove(internalFileName);
currentNumberOfTasks.decrementAndGet();
@@ -223,6 +229,10 @@ public class ScheduledTasks {
}
}
+ PublishedChecker createPublishedChecker() {
+ return new PublishedChecker(applicationConfiguration);
+ }
+
int getCurrentNumberOfTasks() {
return currentNumberOfTasks.get();
}