diff options
Diffstat (limited to 'datafile-app-server/src/main')
8 files changed, 347 insertions, 99 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index 82c390f7..f89a1012 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -42,6 +42,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.stereotype.Component; /** + * Holds all configuration for the DFC. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @@ -79,8 +81,10 @@ public class AppConfig { return ftpesConfiguration; } + /** + * Reads the configuration from file. + */ public void loadConfigurationFromFile() { - GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); JsonParser parser = new JsonParser(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index b4dc6353..52723330 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID; import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; + import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -25,7 +26,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ScheduledFuture; + import javax.annotation.PostConstruct; + import org.apache.commons.lang3.StringUtils; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; @@ -40,11 +43,15 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; + import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; /** + * Api for starting and stopping DFC. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @Configuration @EnableScheduling @@ -63,11 +70,18 @@ public class SchedulerConfig { private final ScheduledTasks scheduledTask; private final CloudConfiguration cloudConfiguration; + /** + * Constructor. + * + * @param taskScheduler The scheduler used to schedule the tasks. + * @param scheduledTasks The scheduler that will actually handle the tasks. + * @param cloudConfiguration The DFC configuration. + */ @Autowired - public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask, - CloudConfiguration cloudConfiguration) { + public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks, + CloudConfiguration cloudConfiguration) { this.taskScheduler = taskScheduler; - this.scheduledTask = scheduledTask; + this.scheduledTask = scheduledTasks; this.cloudConfiguration = cloudConfiguration; } @@ -84,7 +98,7 @@ public class SchedulerConfig { logger.info(EXIT, "Stopped Datafile workflow"); MDC.clear(); return Mono.defer(() -> Mono - .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); + .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); } /** @@ -106,12 +120,14 @@ public class SchedulerConfig { contextMap = MDC.getCopyOfContextMap(); logger.info(ENTRY, "Start scheduling Datafile workflow"); if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(), - SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); - scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap), - SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); - scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), - SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); + scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), + Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); + scheduledFutureList.add( + taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap), + SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); + scheduledFutureList + .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), + SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); return true; } else { @@ -119,4 +135,4 @@ public class SchedulerConfig { } } -}
\ No newline at end of file +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index a8f79ea1..af45cc99 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -88,11 +88,17 @@ public class JsonMessageParser { } } + /** + * Parses the Json message and returns a stream of messages. + * + * @param rawMessage the Json message to parse. + * @return a <code>Flux</code> containing messages. + */ public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) { return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData); } - public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { + Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { JsonParser jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) : element.isJsonObject() ? Optional.of((JsonObject) element) @@ -136,13 +142,11 @@ public class JsonMessageParser { List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap); if (!allFileDataFromJson.isEmpty()) { MessageMetaData messageMetaData = optionalMessageMetaData.get(); - // @formatter:off - return Mono.just(ImmutableFileReadyMessage.builder() - .pnfName(messageMetaData.sourceName()) - .messageMetaData(messageMetaData) - .files(allFileDataFromJson) + return Mono.just(ImmutableFileReadyMessage.builder() // + .pnfName(messageMetaData.sourceName()) // + .messageMetaData(messageMetaData) // + .files(allFileDataFromJson) // .build()); - // @formatter:on } else { return Mono.empty(); } @@ -168,18 +172,16 @@ public class JsonMessageParser { // version. getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues); - // @formatter:off - MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() - .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) - .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) - .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) - .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) - .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) - .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) - .changeIdentifier(changeIdentifier) - .changeType(changeType) + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // + .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) // + .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) // + .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) // + .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) // + .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) // + .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) // + .changeIdentifier(changeIdentifier) // + .changeType(changeType) // .build(); - // @formatter:on if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { return Optional.of(messageMetaData); } else { @@ -231,16 +233,14 @@ public class JsonMessageParser { logger.error("Unable to collect file from xNF.", e); return Optional.empty(); } - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .name(getValueFromJson(fileInfo, NAME, missingValues)) - .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) - .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) - .location(location) - .scheme(scheme) - .compression(getValueFromJson(data, COMPRESSION, missingValues)) + FileData fileData = ImmutableFileData.builder() // + .name(getValueFromJson(fileInfo, NAME, missingValues)) // + .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) // + .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) // + .location(location) // + .scheme(scheme) // + .compression(getValueFromJson(data, COMPRESSION, missingValues)) // .build(); - // @formatter:on if (missingValues.isEmpty()) { return Optional.of(fileData); } @@ -250,8 +250,8 @@ public class JsonMessageParser { } /** - * Gets data from the event name, defined as: - * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example: + * Gets data from the event name. + * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example: * Noti_RnNode-Ericsson_FileReady * * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java index 2cb84112..e2dca182 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -13,6 +13,7 @@ * the License. * ============LICENSE_END======================================================================== */ + package org.onap.dcaegen2.collectors.datafile.service; import java.nio.file.Path; @@ -29,14 +30,30 @@ import java.util.Map; public class PublishedFileCache { private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>()); + /** + * Adds a file to the cache. + * + * @param path the name of the file to add. + * @return <code>null</code> if the file is not already in the cache. + */ public Instant put(Path path) { return publishedFiles.put(path, Instant.now()); } + /** + * Removes a file from the cache. + * + * @param localFileName name of the file to remove. + */ public void remove(Path localFileName) { publishedFiles.remove(localFileName); } + /** + * Removes files 24 hours older than the given instant. + * + * @param now the instant will determine which files that will be purged. + */ public void purge(Instant now) { for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) { Map.Entry<Path, Instant> pair = it.next(); @@ -46,7 +63,7 @@ public class PublishedFileCache { } } - public int size() { + int size() { return publishedFiles.size(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 57edc364..0fef9ab4 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -1,24 +1,46 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.tasks; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Path; import java.time.Duration; import java.util.Map; +import java.util.UUID; + +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; @@ -26,17 +48,30 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; + import reactor.core.publisher.Mono; /** + * Publishes a file to the DataRouter. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public class DataRouterPublisher { + private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META"; + private static final String CONTENT_TYPE = "application/octet-stream"; + private static final String NAME_JSON_TAG = "name"; + private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation"; + private static final String PUBLISH_TOPIC = "publish"; + private static final String DEFAULT_FEED_ID = "1"; private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final AppConfig datafileAppConfig; + private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient; public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; @@ -44,25 +79,70 @@ public class DataRouterPublisher { /** - * Publish one file - * @param consumerDmaapModel information about the file to publish - * @param maxNumberOfRetries the maximal number of retries if the publishing fails - * @param firstBackoffTimeout the time to delay the first retry + * Publish one file. + * + * @param model information about the file to publish + * @param numRetries the maximal number of retries if the publishing fails + * @param firstBackoff the time to delay the first retry * @return the HTTP response status as a string */ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); logger.trace("Method called with arg {}", model); - DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); + dmaapProducerReactiveHttpClient = resolveClient(); - //@formatter:off return Mono.just(model) .cache() - .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap)) - .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) + .flatMap(m -> publishFile(m, contextMap)) // + .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) // .retryBackoff(numRetries, firstBackoff); - //@formatter:on + } + + private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) { + logger.trace("Entering publishFile with {}", consumerDmaapModel); + try { + HttpPut put = new HttpPut(); + String requestId = MDC.get(REQUEST_ID); + put.addHeader(X_ONAP_REQUEST_ID, requestId); + String invocationId = UUID.randomUUID().toString(); + put.addHeader(X_INVOCATION_ID, invocationId); + + prepareHead(consumerDmaapModel, put); + prepareBody(consumerDmaapModel, put); + dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put); + + HttpResponse response = + dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap); + logger.trace(response.toString()); + return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); + } catch (Exception e) { + logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); + return Mono.error(e); + } + } + + private void prepareHead(ConsumerDmaapModel model, HttpPut put) { + put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); + metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); + metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); + put.addHeader(X_DMAAP_DR_META, metaData.toString()); + put.setURI(getPublishUri(model.getInternalLocation().getFileName().toString())); + } + + private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException { + Path fileLocation = model.getInternalLocation(); + try (InputStream fileInputStream = createInputStream(fileLocation)) { + put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); + } + } + + private URI getPublishUri(String fileName) { + return dmaapProducerReactiveHttpClient.getBaseUri() // + .pathSegment(PUBLISH_TOPIC) // + .pathSegment(DEFAULT_FEED_ID) // + .pathSegment(fileName).build(); } private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model, @@ -77,6 +157,10 @@ public class DataRouterPublisher { } } + InputStream createInputStream(Path filePath) throws IOException { + FileSystemResource realResource = new FileSystemResource(filePath); + return realResource.getInputStream(); + } DmaapPublisherConfiguration resolveConfiguration() { return datafileAppConfig.getDmaapPublisherConfiguration(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index af4670e3..fb27a579 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -20,6 +20,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Map; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import reactor.core.publisher.Mono; /** @@ -52,12 +54,10 @@ public class FileCollector { MdcVariables.setMdcContextMap(contextMap); logger.trace("Entering execute with {}", fileData); - //@formatter:off - return Mono.just(fileData) - .cache() - .flatMap(fd -> collectFile(fileData, metaData, contextMap)) - .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); - //@formatter:on + return Mono.just(fileData) // + .cache() // + .flatMap(fd -> collectFile(fileData, metaData, contextMap)) // + .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); } private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData, @@ -92,22 +92,20 @@ public class FileCollector { private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) { String location = fileData.location(); - // @formatter:off - return ImmutableConsumerDmaapModel.builder() - .productName(metaData.productName()) - .vendorName(metaData.vendorName()) - .lastEpochMicrosec(metaData.lastEpochMicrosec()) - .sourceName(metaData.sourceName()) - .startEpochMicrosec(metaData.startEpochMicrosec()) - .timeZoneOffset(metaData.timeZoneOffset()) - .name(fileData.name()) - .location(location) - .internalLocation(localFile.toString()) - .compression(fileData.compression()) - .fileFormatType(fileData.fileFormatType()) - .fileFormatVersion(fileData.fileFormatVersion()) + return ImmutableConsumerDmaapModel.builder() // + .productName(metaData.productName()) // + .vendorName(metaData.vendorName()) // + .lastEpochMicrosec(metaData.lastEpochMicrosec()) // + .sourceName(metaData.sourceName()) // + .startEpochMicrosec(metaData.startEpochMicrosec()) // + .timeZoneOffset(metaData.timeZoneOffset()) // + .name(fileData.name()) // + .location(location) // + .internalLocation(localFile) // + .compression(fileData.compression()) // + .fileFormatType(fileData.fileFormatType()) // + .fileFormatVersion(fileData.fileFormatVersion()) // .build(); - // @formatter:on } SftpClient createSftpClient(FileData fileData) throws DatafileTaskException { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java new file mode 100644 index 00000000..0729caa0 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -0,0 +1,119 @@ +/*- +* ============LICENSE_START======================================================= +* Copyright (C) 2019 Nordix Foundation. +* ================================================================================ +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* SPDX-License-Identifier: Apache-2.0 +* ============LICENSE_END========================================================= +*/ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; + +import java.io.InputStream; +import java.net.URI; +import java.util.Map; +import java.util.UUID; + +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpGet; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; +import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * Bean used to check with DataRouter if a file has been published. + * + * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a> + * + */ +public class PublishedChecker { + private static final String FEEDLOG_TOPIC = "feedlog"; + private static final String DEFAULT_FEED_ID = "1"; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private AppConfig appConfig; + + /** + * Constructor. + * + * @param appConfig The DFC configuration. + */ + public PublishedChecker(AppConfig appConfig) { + this.appConfig = appConfig; + } + + /** + * Checks with DataRouter if the given file has been published already. + * + * @param fileName the name of the file used when it is published. + * + * @return <code>true</code> if the file has been published before, <code>false</code> otherwise. + */ + public boolean execute(String fileName, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + DmaapProducerReactiveHttpClient producerClient = resolveClient(); + + HttpGet getRequest = new HttpGet(); + String requestId = MDC.get(REQUEST_ID); + getRequest.addHeader(X_ONAP_REQUEST_ID, requestId); + String invocationId = UUID.randomUUID().toString(); + getRequest.addHeader(X_INVOCATION_ID, invocationId); + getRequest.setURI(getPublishedQueryUri(fileName, producerClient)); + producerClient.addUserCredentialsToHead(getRequest); + + try { + HttpResponse response = + producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap); + + logger.trace(response.toString()); + int status = response.getStatusLine().getStatusCode(); + HttpEntity entity = response.getEntity(); + InputStream content = entity.getContent(); + String body = IOUtils.toString(content); + return HttpStatus.SC_OK == status && !"[]".equals(body); + } catch (Exception e) { + logger.warn("Unable to check if file has been published.", e); + return false; + } + } + + private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) { + return producerClient.getBaseUri() // + .pathSegment(FEEDLOG_TOPIC) // + .pathSegment(DEFAULT_FEED_ID) // + .queryParam("type", "pub") // + .queryParam("filename", fileName) // + .build(); + } + + protected DmaapPublisherConfiguration resolveConfiguration() { + return appConfig.getDmaapPublisherConfiguration(); + } + + protected DmaapProducerReactiveHttpClient resolveClient() { + return new DmaapProducerReactiveHttpClient(resolveConfiguration()); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 28963377..d41e5c25 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -18,13 +18,13 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; @@ -37,14 +37,15 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * This implements the main flow of the data file collector. Fetch file ready events from the - * message router, fetch new files from the PNF publish these in the data router. + * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new + * files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { @@ -52,7 +53,7 @@ public class ScheduledTasks { private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10; - /** Data needed for fetching of one file */ + /** Data needed for fetching of one file. */ private class FileCollectionData { final FileData fileData; final MessageMetaData metaData; @@ -64,6 +65,7 @@ public class ScheduledTasks { } private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); private final Scheduler scheduler = @@ -96,17 +98,17 @@ public class ScheduledTasks { .parallel(getParallelism()) // Each FileReadyMessage in a separate thread .runOn(scheduler) // .flatMap(this::createFileCollectionTask) // - .filter(this::shouldBePublished) // + .filter(fileData -> shouldBePublished(fileData, contextMap)) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) // .flatMap(model -> publishToDataRouter(model, contextMap)) // - .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) // + .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) // .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // .sequential(); } /** - * called in regular intervals to remove out-dated cached information + * called in regular intervals to remove out-dated cached information. */ public void purgeCachedInformation(Instant now) { alreadyPublishedFiles.purge(now); @@ -144,8 +146,13 @@ public class ScheduledTasks { return Flux.fromIterable(fileCollects); } - private boolean shouldBePublished(FileCollectionData task) { - return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null; + private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) { + boolean result = false; + Path localFileName = task.fileData.getLocalFileName(); + if (alreadyPublishedFiles.put(localFileName) == null) { + result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap); + } + return result; } private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect, @@ -156,7 +163,7 @@ public class ScheduledTasks { MdcVariables.setMdcContextMap(contextMap); return createFileCollector() .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout, - contextMap) + contextMap) .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap)); } @@ -174,10 +181,9 @@ public class ScheduledTasks { final long maxNumberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); - DataRouterPublisher publisherTask = createDataRouterPublisher(); MdcVariables.setMdcContextMap(contextMap); - return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap) + return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap) .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap)); } @@ -185,7 +191,7 @@ public class ScheduledTasks { Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); - Path internalFileName = Paths.get(model.getInternalLocation()); + Path internalFileName = model.getInternalLocation(); deleteFile(internalFileName, contextMap); alreadyPublishedFiles.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); @@ -223,6 +229,10 @@ public class ScheduledTasks { } } + PublishedChecker createPublishedChecker() { + return new PublishedChecker(applicationConfiguration); + } + int getCurrentNumberOfTasks() { return currentNumberOfTasks.get(); } |