diff options
Diffstat (limited to 'datafile-app-server')
22 files changed, 1282 insertions, 786 deletions
diff --git a/datafile-app-server/config/application.yaml b/datafile-app-server/config/application.yaml index f2538578..8985b9b8 100644 --- a/datafile-app-server/config/application.yaml +++ b/datafile-app-server/config/application.yaml @@ -15,7 +15,7 @@ logging: org.springframework: ERROR org.springframework.data: ERROR org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR - org.onap.dcaegen2.collectors.datafile: ERROR + org.onap.dcaegen2.collectors.datafile: WARN file: /var/log/ONAP/application.log app: filepath: /opt/app/datafile/config/datafile_endpoints.json diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index 82c390f7..a30d2826 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -42,6 +42,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.stereotype.Component; /** + * Holds all configuration for the DFC. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @@ -79,8 +81,10 @@ public class AppConfig { return ftpesConfiguration; } + /** + * Reads the configuration from file. + */ public void loadConfigurationFromFile() { - GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); JsonParser parser = new JsonParser(); @@ -108,10 +112,8 @@ public class AppConfig { setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig); } - } catch (IOException e) { - logger.error("Problem with file loading, file: {}", filepath, e); - } catch (JsonSyntaxException e) { - logger.error("Problem with Json deserialization", e); + } catch (JsonSyntaxException | IOException e) { + logger.error("Problem with loading configuration, file: {}", filepath, e); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index b4dc6353..d5c8b3b2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -18,6 +18,7 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID; import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; + import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -25,7 +26,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ScheduledFuture; + import javax.annotation.PostConstruct; + import org.apache.commons.lang3.StringUtils; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; @@ -40,11 +43,15 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; + import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; /** + * Api for starting and stopping DFC. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18 + * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @Configuration @EnableScheduling @@ -56,18 +63,25 @@ public class SchedulerConfig { private static final Logger logger = LoggerFactory.getLogger(SchedulerConfig.class); private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); private static final Marker EXIT = MarkerFactory.getMarker("EXIT"); - private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); + private static List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); private Map<String, String> contextMap; private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; private final CloudConfiguration cloudConfiguration; + /** + * Constructor. + * + * @param taskScheduler The scheduler used to schedule the tasks. + * @param scheduledTasks The scheduler that will actually handle the tasks. + * @param cloudConfiguration The DFC configuration. + */ @Autowired - public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask, - CloudConfiguration cloudConfiguration) { + public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks, + CloudConfiguration cloudConfiguration) { this.taskScheduler = taskScheduler; - this.scheduledTask = scheduledTask; + this.scheduledTask = scheduledTasks; this.cloudConfiguration = cloudConfiguration; } @@ -83,8 +97,7 @@ public class SchedulerConfig { MdcVariables.setMdcContextMap(contextMap); logger.info(EXIT, "Stopped Datafile workflow"); MDC.clear(); - return Mono.defer(() -> Mono - .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); + return Mono.just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)); } /** @@ -106,12 +119,14 @@ public class SchedulerConfig { contextMap = MDC.getCopyOfContextMap(); logger.info(ENTRY, "Start scheduling Datafile workflow"); if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(), - SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); - scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap), - SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); - scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), - SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); + scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), + Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); + scheduledFutureList.add( + taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap), + SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); + scheduledFutureList + .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), + SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); return true; } else { @@ -119,4 +134,4 @@ public class SchedulerConfig { } } -}
\ No newline at end of file +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java deleted file mode 100644 index 36279016..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.exceptions; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 - */ -public class DmaapNotFoundException extends DatafileTaskException { - - private static final long serialVersionUID = 1L; - - public DmaapNotFoundException(String message) { - super(message); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index bdb47b2b..d0d1f91a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -37,10 +37,19 @@ import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; public abstract class FileData { private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/"; + /** + * @return the file name with no path + */ public abstract String name(); + /** + * @return the URL to use to fetch the file from the PNF + */ public abstract String location(); + /** + * @return the file transfer protocol to use for fetching the file + */ public abstract Scheme scheme(); public abstract String compression(); @@ -49,17 +58,25 @@ public abstract class FileData { public abstract String fileFormatVersion(); + public abstract MessageMetaData messageMetaData(); + + /** + * @return the name of the PNF, must be unique in the network + */ + public String sourceName() { + return messageMetaData().sourceName(); + } + public String remoteFilePath() { return URI.create(location()).getPath(); } public Path getLocalFileName() { - URI uri = URI.create(location()); - return createLocalFileName(uri.getHost(), name()); + return createLocalFileName(messageMetaData().sourceName(), name()); } - public static Path createLocalFileName(String host, String fileName) { - return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName); + public static Path createLocalFileName(String sourceName, String fileName) { + return Paths.get(DATAFILE_TMPDIR, sourceName + "_" + fileName); } public FileServerData fileServerData() { @@ -86,4 +103,4 @@ public abstract class FileData { } return Optional.empty(); } -}
\ No newline at end of file +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java index e3293faa..9373a4f2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java @@ -31,9 +31,5 @@ import org.immutables.value.Value; @Value.Immutable @Gson.TypeAdapters public interface FileReadyMessage { - public String pnfName(); - - public MessageMetaData messageMetaData(); - public List<FileData> files(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index a8f79ea1..a3595ecf 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -88,15 +88,25 @@ public class JsonMessageParser { } } + /** + * Parses the Json message and returns a stream of messages. + * + * @param rawMessage the Json message to parse. + * @return a <code>Flux</code> containing messages. + */ public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) { return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData); } - public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { + Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { JsonParser jsonParser = new JsonParser(); - return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) - : element.isJsonObject() ? Optional.of((JsonObject) element) - : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + if (element.isJsonPrimitive()) { + return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()); + } else if (element.isJsonObject()) { + return Optional.of((JsonObject) element); + } else { + return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + } } private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) { @@ -127,34 +137,32 @@ public class JsonMessageParser { : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } + private Mono<FileReadyMessage> transformMessages(JsonObject message) { Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { + MessageMetaData messageMetaData = optionalMessageMetaData.get(); JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (arrayOfNamedHashMap != null) { - List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap); + List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap, messageMetaData); if (!allFileDataFromJson.isEmpty()) { - MessageMetaData messageMetaData = optionalMessageMetaData.get(); - // @formatter:off - return Mono.just(ImmutableFileReadyMessage.builder() - .pnfName(messageMetaData.sourceName()) - .messageMetaData(messageMetaData) - .files(allFileDataFromJson) + return Mono.just(ImmutableFileReadyMessage.builder() // + .files(allFileDataFromJson) // .build()); - // @formatter:on } else { return Mono.empty(); } } - logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); + logger.error("VES event parsing. Missing arrayOfNamedHashMap in message. {}", message); return Mono.empty(); } - logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); + logger.error("VES event parsing. FileReady event has incorrect JsonObject. {}", message); return Mono.empty(); } + private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { List<String> missingValues = new ArrayList<>(); JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER); @@ -168,22 +176,20 @@ public class JsonMessageParser { // version. getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues); - // @formatter:off - MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() - .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) - .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) - .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) - .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) - .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) - .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) - .changeIdentifier(changeIdentifier) - .changeType(changeType) + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // + .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) // + .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) // + .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) // + .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) // + .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) // + .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) // + .changeIdentifier(changeIdentifier) // + .changeType(changeType) // .build(); - // @formatter:on if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) { return Optional.of(messageMetaData); } else { - String errorMessage = "Unable to collect file from xNF."; + String errorMessage = "VES event parsing."; if (!missingValues.isEmpty()) { errorMessage += " Missing data: " + missingValues; } @@ -204,11 +210,11 @@ public class JsonMessageParser { return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier); } - private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) { + private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields, MessageMetaData messageMetaData) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i); - Optional<FileData> fileData = getFileDataFromJson(fileInfo); + Optional<FileData> fileData = getFileDataFromJson(fileInfo, messageMetaData); if (fileData.isPresent()) { res.add(fileData.get()); @@ -217,7 +223,7 @@ public class JsonMessageParser { return res; } - private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) { + private Optional<FileData> getFileDataFromJson(JsonObject fileInfo, MessageMetaData messageMetaData) { logger.trace("starting to getFileDataFromJson!"); List<String> missingValues = new ArrayList<>(); @@ -228,30 +234,28 @@ public class JsonMessageParser { try { scheme = Scheme.getSchemeFromString(URI.create(location).getScheme()); } catch (Exception e) { - logger.error("Unable to collect file from xNF.", e); + logger.error("VES event parsing.", e); return Optional.empty(); } - // @formatter:off - FileData fileData = ImmutableFileData.builder() - .name(getValueFromJson(fileInfo, NAME, missingValues)) - .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) - .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) - .location(location) - .scheme(scheme) - .compression(getValueFromJson(data, COMPRESSION, missingValues)) + FileData fileData = ImmutableFileData.builder() // + .name(getValueFromJson(fileInfo, NAME, missingValues)) // + .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) // + .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) // + .location(location) // + .scheme(scheme) // + .compression(getValueFromJson(data, COMPRESSION, missingValues)) // + .messageMetaData(messageMetaData) .build(); - // @formatter:on if (missingValues.isEmpty()) { return Optional.of(fileData); } - logger.error("Unable to collect file from xNF. File information wrong. Missing data: {} Data: {}", - missingValues, fileInfo); + logger.error("VES event parsing. File information wrong. Missing data: {} Data: {}", missingValues, fileInfo); return Optional.empty(); } /** - * Gets data from the event name, defined as: - * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example: + * Gets data from the event name. + * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example: * Noti_RnNode-Ericsson_FileReady * * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}. diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java index 2cb84112..e2dca182 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -13,6 +13,7 @@ * the License. * ============LICENSE_END======================================================================== */ + package org.onap.dcaegen2.collectors.datafile.service; import java.nio.file.Path; @@ -29,14 +30,30 @@ import java.util.Map; public class PublishedFileCache { private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>()); + /** + * Adds a file to the cache. + * + * @param path the name of the file to add. + * @return <code>null</code> if the file is not already in the cache. + */ public Instant put(Path path) { return publishedFiles.put(path, Instant.now()); } + /** + * Removes a file from the cache. + * + * @param localFileName name of the file to remove. + */ public void remove(Path localFileName) { publishedFiles.remove(localFileName); } + /** + * Removes files 24 hours older than the given instant. + * + * @param now the instant will determine which files that will be purged. + */ public void purge(Instant now) { for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) { Map.Entry<Path, Instant> pair = it.next(); @@ -46,7 +63,7 @@ public class PublishedFileCache { } } - public int size() { + int size() { return publishedFiles.size(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java index f6daf733..49e2f01e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java @@ -30,8 +30,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; + import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> @@ -39,25 +40,21 @@ import reactor.core.publisher.Flux; public class DMaaPMessageConsumerTask { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class); - private AppConfig datafileAppConfig; - private JsonMessageParser jsonMessageParser; - private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; + private final JsonMessageParser jsonMessageParser; + private final DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) { - this.datafileAppConfig = datafileAppConfig; this.jsonMessageParser = new JsonMessageParser(); + this.dmaaPConsumerReactiveHttpClient = createHttpClient(datafileAppConfig); } - protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig, - DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + protected DMaaPMessageConsumerTask(DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, JsonMessageParser messageParser) { - this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.jsonMessageParser = messageParser; } public Flux<FileReadyMessage> execute() { - dmaaPConsumerReactiveHttpClient = resolveClient(); logger.trace("execute called"); return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse())); } @@ -67,15 +64,10 @@ public class DMaaPMessageConsumerTask { return jsonMessageParser.getMessagesFromJson(message); } - protected DmaapConsumerConfiguration resolveConfiguration() { - return datafileAppConfig.getDmaapConsumerConfiguration(); + private static DMaaPConsumerReactiveHttpClient createHttpClient(AppConfig datafileAppConfig) { + DmaapConsumerConfiguration config = datafileAppConfig.getDmaapConsumerConfiguration(); + WebClient client = new DmaapReactiveWebClient().fromConfiguration(config).build(); + return new DMaaPConsumerReactiveHttpClient(config, client); } - protected DMaaPConsumerReactiveHttpClient resolveClient() { - return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient()); - } - - protected WebClient buildWebClient() { - return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build(); - } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 57edc364..4207d1fc 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -1,24 +1,46 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.tasks; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; + +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Path; import java.time.Duration; import java.util.Map; +import java.util.UUID; + +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; @@ -26,17 +48,30 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.core.io.FileSystemResource; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; + import reactor.core.publisher.Mono; /** + * Publishes a file to the DataRouter. + * * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ public class DataRouterPublisher { + private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META"; + private static final String CONTENT_TYPE = "application/octet-stream"; + private static final String NAME_JSON_TAG = "name"; + private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation"; + private static final String PUBLISH_TOPIC = "publish"; + private static final String DEFAULT_FEED_ID = "1"; private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final AppConfig datafileAppConfig; + private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient; public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; @@ -44,25 +79,71 @@ public class DataRouterPublisher { /** - * Publish one file - * @param consumerDmaapModel information about the file to publish - * @param maxNumberOfRetries the maximal number of retries if the publishing fails - * @param firstBackoffTimeout the time to delay the first retry - * @return the HTTP response status as a string + * Publish one file. + * + * @param model information about the file to publish + * @param numRetries the maximal number of retries if the publishing fails + * @param firstBackoff the time to delay the first retry + * @param contextMap tracing context variables + * @return the (same) ConsumerDmaapModel */ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.trace("Method called with arg {}", model); - DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); + logger.trace("Publish called with arg {}", model); + dmaapProducerReactiveHttpClient = resolveClient(); - //@formatter:off return Mono.just(model) .cache() - .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap)) - .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) + .flatMap(m -> publishFile(m, contextMap)) // + .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) // .retryBackoff(numRetries, firstBackoff); - //@formatter:on + } + + private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) { + logger.trace("Entering publishFile with {}", consumerDmaapModel); + try { + HttpPut put = new HttpPut(); + String requestId = MDC.get(REQUEST_ID); + put.addHeader(X_ONAP_REQUEST_ID, requestId); + String invocationId = UUID.randomUUID().toString(); + put.addHeader(X_INVOCATION_ID, invocationId); + + prepareHead(consumerDmaapModel, put); + prepareBody(consumerDmaapModel, put); + dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put); + + HttpResponse response = + dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap); + logger.trace(response.toString()); + return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); + } catch (Exception e) { + logger.warn("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e); + return Mono.error(e); + } + } + + private void prepareHead(ConsumerDmaapModel model, HttpPut put) { + put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); + JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model)); + metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString(); + metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG); + put.addHeader(X_DMAAP_DR_META, metaData.toString()); + put.setURI(getPublishUri(model.getInternalLocation().getFileName().toString())); + } + + private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException { + Path fileLocation = model.getInternalLocation(); + try (InputStream fileInputStream = createInputStream(fileLocation)) { + put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream))); + } + } + + private URI getPublishUri(String fileName) { + return dmaapProducerReactiveHttpClient.getBaseUri() // + .pathSegment(PUBLISH_TOPIC) // + .pathSegment(DEFAULT_FEED_ID) // + .pathSegment(fileName).build(); } private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model, @@ -77,6 +158,10 @@ public class DataRouterPublisher { } } + InputStream createInputStream(Path filePath) throws IOException { + FileSystemResource realResource = new FileSystemResource(filePath); + return realResource.getInputStream(); + } DmaapPublisherConfiguration resolveConfiguration() { return datafileAppConfig.getDmaapPublisherConfiguration(); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index af4670e3..8849b45e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -20,6 +20,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.Map; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -33,6 +34,7 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import reactor.core.publisher.Mono; /** @@ -47,33 +49,31 @@ public class FileCollector { this.datafileAppConfig = datafileAppConfig; } - public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, - Duration firstBackoffTimeout, Map<String, String> contextMap) { + public Mono<ConsumerDmaapModel> execute(FileData fileData, long maxNumberOfRetries, Duration firstBackoffTimeout, + Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); logger.trace("Entering execute with {}", fileData); - //@formatter:off - return Mono.just(fileData) - .cache() - .flatMap(fd -> collectFile(fileData, metaData, contextMap)) - .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); - //@formatter:on + return Mono.just(fileData) // + .cache() // + .flatMap(fd -> collectFile(fileData, contextMap)) // + .retryBackoff(maxNumberOfRetries, firstBackoffTimeout); } - private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData, - Map<String, String> contextMap) { + private Mono<ConsumerDmaapModel> collectFile(FileData fileData, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.trace("starting to collectFile"); + logger.trace("starting to collectFile {}", fileData.name()); final String remoteFile = fileData.remoteFilePath(); final Path localFile = fileData.getLocalFileName(); try (FileCollectClient currentClient = createClient(fileData)) { + currentClient.open(); localFile.getParent().toFile().mkdir(); // Create parent directories currentClient.collectFile(remoteFile, localFile); - return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile)); + return Mono.just(getConsumerDmaapModel(fileData, localFile)); } catch (Exception throwable) { - logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable); + logger.warn("Failed to download file: {} {}, reason: {}", fileData.sourceName(), fileData.name(), throwable.toString()); return Mono.error(throwable); } } @@ -89,37 +89,32 @@ public class FileCollector { } } - private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) { + private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, Path localFile) { String location = fileData.location(); - - // @formatter:off - return ImmutableConsumerDmaapModel.builder() - .productName(metaData.productName()) - .vendorName(metaData.vendorName()) - .lastEpochMicrosec(metaData.lastEpochMicrosec()) - .sourceName(metaData.sourceName()) - .startEpochMicrosec(metaData.startEpochMicrosec()) - .timeZoneOffset(metaData.timeZoneOffset()) - .name(fileData.name()) - .location(location) - .internalLocation(localFile.toString()) - .compression(fileData.compression()) - .fileFormatType(fileData.fileFormatType()) - .fileFormatVersion(fileData.fileFormatVersion()) + MessageMetaData metaData = fileData.messageMetaData(); + return ImmutableConsumerDmaapModel.builder() // + .productName(metaData.productName()) // + .vendorName(metaData.vendorName()) // + .lastEpochMicrosec(metaData.lastEpochMicrosec()) // + .sourceName(metaData.sourceName()) // + .startEpochMicrosec(metaData.startEpochMicrosec()) // + .timeZoneOffset(metaData.timeZoneOffset()) // + .name(fileData.name()) // + .location(location) // + .internalLocation(localFile) // + .compression(fileData.compression()) // + .fileFormatType(fileData.fileFormatType()) // + .fileFormatVersion(fileData.fileFormatVersion()) // .build(); - // @formatter:on } - SftpClient createSftpClient(FileData fileData) throws DatafileTaskException { - SftpClient client = new SftpClient(fileData.fileServerData()); - client.open(); - return client; + protected SftpClient createSftpClient(FileData fileData) { + return new SftpClient(fileData.fileServerData()); } - FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException { + protected FtpsClient createFtpsClient(FileData fileData) { FtpesConfig config = datafileAppConfig.getFtpesConfiguration(); - FtpsClient client = new FtpsClient(fileData.fileServerData()); - client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword()); - return client; + return new FtpsClient(fileData.fileServerData(), config.keyCert(), config.keyPassword(), + Paths.get(config.trustedCA()), config.trustedCAPassword()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java new file mode 100644 index 00000000..41f8e3cd --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -0,0 +1,120 @@ +/*- +* ============LICENSE_START======================================================= +* Copyright (C) 2019 Nordix Foundation. +* ================================================================================ +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* SPDX-License-Identifier: Apache-2.0 +* ============LICENSE_END========================================================= +*/ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID; +import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID; + +import java.io.InputStream; +import java.net.URI; +import java.util.Map; +import java.util.UUID; + +import org.apache.commons.io.IOUtils; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpGet; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; +import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +/** + * Bean used to check with DataRouter if a file has been published. + * + * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a> + * + */ +public class PublishedChecker { + private static final String FEEDLOG_TOPIC = "feedlog"; + private static final String DEFAULT_FEED_ID = "1"; + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private final AppConfig appConfig; + + /** + * Constructor. + * + * @param appConfig The DFC configuration. + */ + public PublishedChecker(AppConfig appConfig) { + this.appConfig = appConfig; + } + + /** + * Checks with DataRouter if the given file has been published already. + * + * @param fileName the name of the file used when it is published. + * + * @return <code>true</code> if the file has been published before, <code>false</code> otherwise. + */ + public boolean execute(String fileName, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + DmaapProducerReactiveHttpClient producerClient = resolveClient(); + + HttpGet getRequest = new HttpGet(); + String requestId = MDC.get(REQUEST_ID); + getRequest.addHeader(X_ONAP_REQUEST_ID, requestId); + String invocationId = UUID.randomUUID().toString(); + getRequest.addHeader(X_INVOCATION_ID, invocationId); + getRequest.setURI(getPublishedQueryUri(fileName, producerClient)); + producerClient.addUserCredentialsToHead(getRequest); + + try { + HttpResponse response = + producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap); + + logger.trace("{}", response); + int status = response.getStatusLine().getStatusCode(); + HttpEntity entity = response.getEntity(); + try (InputStream content = entity.getContent()) { + String body = IOUtils.toString(content); + return HttpStatus.SC_OK == status && !"[]".equals(body); + } + } catch (Exception e) { + logger.warn("Unable to check if file has been published.", e); + return false; + } + } + + private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) { + return producerClient.getBaseUri() // + .pathSegment(FEEDLOG_TOPIC) // + .pathSegment(DEFAULT_FEED_ID) // + .queryParam("type", "pub") // + .queryParam("filename", fileName) // + .build(); + } + + protected DmaapPublisherConfiguration resolveConfiguration() { + return appConfig.getDmaapPublisherConfiguration(); + } + + protected DmaapProducerReactiveHttpClient resolveClient() { + return new DmaapProducerReactiveHttpClient(resolveConfiguration()); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 28963377..b4096c73 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -18,18 +18,15 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; + import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; -import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables; import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; import org.slf4j.Logger; @@ -37,6 +34,7 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -49,25 +47,18 @@ import reactor.core.scheduler.Schedulers; @Component public class ScheduledTasks { - private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; - private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10; - - /** Data needed for fetching of one file */ - private class FileCollectionData { - final FileData fileData; - final MessageMetaData metaData; - - FileCollectionData(FileData fd, MessageMetaData metaData) { - this.fileData = fd; - this.metaData = metaData; - } - } + private static final int NUMBER_OF_WORKER_THREADS = 100; + private static final int MAX_TASKS_FOR_POLLING = 50; + private static final long DATA_ROUTER_MAX_RETRIES = 5; + private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2); + private static final long FILE_TRANSFER_MAX_RETRIES = 3; + private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(5); private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final AppConfig applicationConfiguration; private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); - private final Scheduler scheduler = - Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS); + private final Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS); PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); /** @@ -84,86 +75,93 @@ public class ScheduledTasks { * Main function for scheduling for the file collection Workflow. */ public void scheduleMainDatafileEventTask(Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.trace("Execution of tasks was registered"); - applicationConfiguration.loadConfigurationFromFile(); - createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), - () -> onComplete(contextMap)); + try { + MdcVariables.setMdcContextMap(contextMap); + logger.trace("Execution of tasks was registered"); + applicationConfiguration.loadConfigurationFromFile(); + createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), + () -> onComplete(contextMap)); + } catch (Exception e) { + logger.error("Unexpected exception: ", e); + } } Flux<ConsumerDmaapModel> createMainTask(Map<String, String> contextMap) { return fetchMoreFileReadyMessages() // - .parallel(getParallelism()) // Each FileReadyMessage in a separate thread + .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread .runOn(scheduler) // - .flatMap(this::createFileCollectionTask) // - .filter(this::shouldBePublished) // + .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) // + .filter(fileData -> shouldBePublished(fileData, contextMap)) // .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // - .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) // + .flatMap(fileData -> fetchFile(fileData, contextMap)) // .flatMap(model -> publishToDataRouter(model, contextMap)) // - .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) // + .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) // .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // .sequential(); } /** - * called in regular intervals to remove out-dated cached information + * called in regular intervals to remove out-dated cached information. */ public void purgeCachedInformation(Instant now) { alreadyPublishedFiles.purge(now); } - private void onComplete(Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.info("Datafile tasks have been completed"); + protected PublishedChecker createPublishedChecker() { + return new PublishedChecker(applicationConfiguration); } - private void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.info("Datafile consumed tasks {}", model.getInternalLocation()); + protected int getCurrentNumberOfTasks() { + return currentNumberOfTasks.get(); } - private void onError(Throwable throwable, Map<String, String> contextMap) { - MdcVariables.setMdcContextMap(contextMap); - logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); + protected DMaaPMessageConsumerTask createConsumerTask() { + return new DMaaPMessageConsumerTask(this.applicationConfiguration); } - private int getParallelism() { - if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) { - return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks(); - } else { - return 1; // We need at least one rail/thread - } + protected FileCollector createFileCollector() { + return new FileCollector(applicationConfiguration); } - private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) { - List<FileCollectionData> fileCollects = new ArrayList<>(); + protected DataRouterPublisher createDataRouterPublisher() { + return new DataRouterPublisher(applicationConfiguration); + } - for (FileData fileData : availableFiles.files()) { - fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData())); - } - return Flux.fromIterable(fileCollects); + private void onComplete(Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.trace("Datafile tasks have been completed"); + } + + private synchronized void onSuccess(ConsumerDmaapModel model, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.info("Datafile file published {}", model.getInternalLocation()); } - private boolean shouldBePublished(FileCollectionData task) { - return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null; + private void onError(Throwable throwable, Map<String, String> contextMap) { + MdcVariables.setMdcContextMap(contextMap); + logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable.toString()); } - private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect, - Map<String, String> contextMap) { - final long maxNUmberOfRetries = 3; - final Duration initialRetryTimeout = Duration.ofSeconds(5); + private boolean shouldBePublished(FileData fileData, Map<String, String> contextMap) { + boolean result = false; + Path localFileName = fileData.getLocalFileName(); + if (alreadyPublishedFiles.put(localFileName) == null) { + result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap); + } + return result; + } + private Mono<ConsumerDmaapModel> fetchFile(FileData fileData, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); return createFileCollector() - .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout, - contextMap) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap)); + .execute(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT, contextMap) + .onErrorResume(exception -> handleFetchFileFailure(fileData, contextMap)); } - private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Map<String, String> contextMap) { + private Mono<ConsumerDmaapModel> handleFetchFileFailure(FileData fileData, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); Path localFileName = fileData.getLocalFileName(); - logger.error("File fetching failed: {}", localFileName); + logger.error("File fetching failed, fileData {}", fileData); deleteFile(localFileName, contextMap); alreadyPublishedFiles.remove(localFileName); currentNumberOfTasks.decrementAndGet(); @@ -171,21 +169,17 @@ public class ScheduledTasks { } private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model, Map<String, String> contextMap) { - final long maxNumberOfRetries = 3; - final Duration initialRetryTimeout = Duration.ofSeconds(5); - - DataRouterPublisher publisherTask = createDataRouterPublisher(); - MdcVariables.setMdcContextMap(contextMap); - return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap) - .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap)); + + return createDataRouterPublisher() + .execute(model, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT, contextMap) + .onErrorResume(exception -> handlePublishFailure(model, contextMap)); } - private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception, - Map<String, String> contextMap) { + private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); - Path internalFileName = Paths.get(model.getInternalLocation()); + logger.error("File publishing failed: {}", model); + Path internalFileName = model.getInternalLocation(); deleteFile(internalFileName, contextMap); alreadyPublishedFiles.remove(internalFileName); currentNumberOfTasks.decrementAndGet(); @@ -196,8 +190,9 @@ public class ScheduledTasks { * Fetch more messages from the message router. This is done in a polling/blocking fashion. */ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { - logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); - if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) { + logger.info("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); + if (getCurrentNumberOfTasks() > MAX_TASKS_FOR_POLLING) { + logger.info("Skipping, current number of tasks: {}", getCurrentNumberOfTasks()); return Flux.empty(); } @@ -209,7 +204,8 @@ public class ScheduledTasks { private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); - logger.error("Polling for file ready message failed, exception: {}", exception); + logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), + this.applicationConfiguration.getDmaapConsumerConfiguration()); return Flux.empty(); } @@ -223,20 +219,4 @@ public class ScheduledTasks { } } - int getCurrentNumberOfTasks() { - return currentNumberOfTasks.get(); - } - - DMaaPMessageConsumerTask createConsumerTask() { - return new DMaaPMessageConsumerTask(this.applicationConfiguration); - } - - FileCollector createFileCollector() { - return new FileCollector(applicationConfiguration); - } - - DataRouterPublisher createDataRouterPublisher() { - return new DataRouterPublisher(applicationConfiguration); - } - } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java index 2c136304..2c136304 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java index acae1e6e..b67fac23 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java @@ -28,56 +28,49 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.Immutabl class CloudConfigParserTest { - - private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = - //@formatter:on - new ImmutableDmaapConsumerConfiguration.Builder() - .timeoutMs(-1) - .dmaapHostName("message-router.onap.svc.cluster.local") - .dmaapUserName("admin") - .dmaapUserPassword("admin") - .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") - .dmaapPortNumber(2222) - .dmaapContentType("application/json") - .messageLimit(-1) - .dmaapProtocol("http") - .consumerId("C12") - .consumerGroup("OpenDCAE-c12") - .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath") - .keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath") - .enableDmaapCertAuth(true) + private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = // + new ImmutableDmaapConsumerConfiguration.Builder() // + .timeoutMs(-1) // + .dmaapHostName("message-router.onap.svc.cluster.local") // + .dmaapUserName("admin") // + .dmaapUserPassword("admin") // + .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") // + .dmaapPortNumber(2222) // + .dmaapContentType("application/json") // + .messageLimit(-1) // + .dmaapProtocol("http") // + .consumerId("C12") // + .consumerGroup("OpenDCAE-c12") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // .build(); - //@formatter:off - - private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = - //@formatter:on - new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapTopicName("publish") - .dmaapUserPassword("dradmin") - .dmaapPortNumber(3907) - .dmaapProtocol("https") - .dmaapContentType("application/json") - .dmaapHostName("message-router.onap.svc.cluster.local") - .dmaapUserName("dradmin") - .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath") - .keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath") - .enableDmaapCertAuth(true) + + private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = // + new ImmutableDmaapPublisherConfiguration.Builder() // + .dmaapTopicName("publish") // + .dmaapUserPassword("dradmin") // + .dmaapPortNumber(3907) // + .dmaapProtocol("https") // + .dmaapContentType("application/json") // + .dmaapHostName("message-router.onap.svc.cluster.local") // + .dmaapUserName("dradmin") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // .build(); - //@formatter:off - - private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = - //@formatter:on - new ImmutableFtpesConfig.Builder() - .keyCert("/config/ftpKey.jks") - .keyPassword("secret") - .trustedCA("config/cacerts") - .trustedCAPassword("secret") + + private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = // + new ImmutableFtpesConfig.Builder() // + .keyCert("/config/ftpKey.jks") // + .keyPassword("secret") // + .trustedCA("config/cacerts") // + .trustedCAPassword("secret") // .build(); - //@formatter:off private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject()); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java index 1f5827c8..84c5e07b 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java @@ -44,6 +44,20 @@ public class FileDataTest { private static final String LOCATION_WITHOUT_USER = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + + private MessageMetaData messageMetaData() { + return ImmutableMessageMetaData.builder() + .productName("PRODUCT_NAME") + .vendorName("VENDOR_NAME") + .lastEpochMicrosec("LAST_EPOCH_MICROSEC") + .sourceName("SOURCE_NAME") + .startEpochMicrosec("START_EPOCH_MICROSEC") + .timeZoneOffset("TIME_ZONE_OFFSET") + .changeIdentifier("PM_MEAS_CHANGE_IDENTIFIER") + .changeType("FILE_READY_CHANGE_TYPE") + .build(); + } + private FileData properFileDataWithUser() { // @formatter:off return ImmutableFileData.builder() @@ -53,6 +67,7 @@ public class FileDataTest { .fileFormatType("type") .fileFormatVersion("version") .scheme(Scheme.FTPS) + .messageMetaData(messageMetaData()) .build(); // @formatter:on } @@ -66,6 +81,7 @@ public class FileDataTest { .fileFormatType("type") .fileFormatVersion("version") .scheme(Scheme.FTPS) + .messageMetaData(messageMetaData()) .build(); // @formatter:on } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java index f7b83297..b8aa7da2 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -29,7 +29,6 @@ import java.util.Optional; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -67,49 +66,47 @@ class JsonMessageParserTest { private static final String NOTIFICATION_FIELDS_VERSION = "1.0"; @Test - void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - - MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .build(); - FileData expectedFileData = ImmutableFileData.builder() - .name(PM_FILE_NAME) - .location(LOCATION) - .scheme(Scheme.FTPS) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + void whenPassingCorrectJson_oneFileReadyMessage() { + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // + .build(); + + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .build(); + FileData expectedFileData = ImmutableFileData.builder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .scheme(Scheme.FTPS) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); List<FileData> files = new ArrayList<>(); files.add(expectedFileData); FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() - .pnfName(SOURCE_NAME) - .messageMetaData(messageMetaData) - .files(files) + .files(files) // .build(); - // @formatter:on + String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -122,49 +119,47 @@ class JsonMessageParserTest { } @Test - void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - - MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .build(); - FileData expectedFileData = ImmutableFileData.builder() - .name(PM_FILE_NAME) - .location(LOCATION) - .scheme(Scheme.FTPS) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + void whenPassingCorrectJsonWithTwoEvents_twoMessages() { + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // + .build(); + + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .build(); + FileData expectedFileData = ImmutableFileData.builder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .scheme(Scheme.FTPS) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); List<FileData> files = new ArrayList<>(); files.add(expectedFileData); - FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() - .pnfName(SOURCE_NAME) - .messageMetaData(messageMetaData) - .files(files) + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() // + .files(files) // .build(); - // @formatter:on + String parsedString = message.getParsed(); String messageString = "[" + parsedString + "," + parsedString + "]"; JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -178,21 +173,20 @@ class JsonMessageParserTest { @Test void whenPassingCorrectJsonWithoutLocation_noMessage() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // .build(); - // @formatter:on + String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -205,49 +199,47 @@ class JsonMessageParserTest { } @Test - void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - - MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .build(); - FileData expectedFileData = ImmutableFileData.builder() - .name(PM_FILE_NAME) - .location(LOCATION) - .scheme(Scheme.FTPS) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() { + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // + .build(); + + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .build(); + FileData expectedFileData = ImmutableFileData.builder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .scheme(Scheme.FTPS) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); List<FileData> files = new ArrayList<>(); files.add(expectedFileData); - FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() - .pnfName(SOURCE_NAME) - .messageMetaData(messageMetaData) - .files(files) + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() // + .files(files) // .build(); - // @formatter:on + String parsedString = message.getParsed(); String messageString = "[{\"event\":{}}," + parsedString + "]"; JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser(); @@ -258,21 +250,20 @@ class JsonMessageParserTest { @Test void whenPassingCorrectJsonWithFaultyEventName_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .location(LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName("Faulty event name") - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName("Faulty event name") // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // .build(); - // @formatter:on + String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -286,21 +277,20 @@ class JsonMessageParserTest { @Test void whenPassingCorrectJsonWithoutName_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .location(LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // .build(); - // @formatter:on + String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -314,14 +304,13 @@ class JsonMessageParserTest { @Test void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() { - // @formatter:off - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .build(); - // @formatter:on + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .build(); + String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -335,21 +324,20 @@ class JsonMessageParserTest { @Test void whenPassingCorrectJsonWithoutCompression_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) - .build(); - // @formatter:on + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // + .build(); + String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -363,21 +351,20 @@ class JsonMessageParserTest { @Test void whenPassingCorrectJsonWithoutFileFormatType_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatVersion(FILE_FORMAT_VERSION) + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // .build(); - // @formatter:on + String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -391,55 +378,53 @@ class JsonMessageParserTest { @Test void whenPassingOneCorrectJsonWithoutFileFormatVersionAndOneCorrect_oneFileData() { - // @formatter:off - AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .build(); - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) - .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalFaultyField) - .addAdditionalField(additionalField) - .build(); - - MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .build(); - FileData expectedFileData = ImmutableFileData.builder() - .name(PM_FILE_NAME) - .location(LOCATION) - .scheme(Scheme.FTPS) - .compression(GZIP_COMPRESSION) - .fileFormatType(FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .build(); + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .build(); + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalFaultyField) // + .addAdditionalField(additionalField) // + .build(); + + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .build(); + FileData expectedFileData = ImmutableFileData.builder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .scheme(Scheme.FTPS) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); List<FileData> files = new ArrayList<>(); files.add(expectedFileData); - FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() - .pnfName(SOURCE_NAME) - .messageMetaData(messageMetaData) - .files(files) + FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() // + .files(files) // .build(); - // @formatter:on + String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -453,14 +438,13 @@ class JsonMessageParserTest { @Test void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() { - // @formatter:off - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier("PM_MEAS_FILES_INVALID") - .changeType("FileReady_INVALID") - .notificationFieldsVersion("1.0_INVALID") - .build(); - // @formatter:on + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier("PM_MEAS_FILES_INVALID") // + .changeType("FileReady_INVALID") // + .notificationFieldsVersion("1.0_INVALID") // + .build(); + String incorrectMessageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -486,21 +470,20 @@ class JsonMessageParserTest { @Test void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // .compression(GZIP_COMPRESSION) - .fileFormatVersion(FILE_FORMAT_VERSION) + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(CHANGE_IDENTIFIER) - .changeType(INCORRECT_CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(CHANGE_IDENTIFIER) // + .changeType(INCORRECT_CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // .build(); - // @formatter:on + String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); @@ -514,21 +497,20 @@ class JsonMessageParserTest { @Test void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() { - // @formatter:off - AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() - .name(PM_FILE_NAME) - .location(LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatVersion(FILE_FORMAT_VERSION) + AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() // + .name(PM_FILE_NAME) // + .location(LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - JsonMessage message = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) - .changeType(CHANGE_TYPE) - .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) - .addAdditionalField(additionalField) + JsonMessage message = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) // + .changeType(CHANGE_TYPE) // + .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) // + .addAdditionalField(additionalField) // .build(); - // @formatter:on + String messageString = message.toString(); String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java index f88e301d..98c7dc32 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java @@ -20,7 +20,6 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -28,6 +27,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @@ -74,7 +75,7 @@ public class DMaaPMessageConsumerTaskImplTest { private static final String PORT_22 = "22"; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; - private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME; + private static final Path LOCAL_FILE_LOCATION = Paths.get("target/" + PM_FILE_NAME); private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; private static final String GZIP_COMPRESSION = "gzip"; @@ -86,7 +87,7 @@ public class DMaaPMessageConsumerTaskImplTest { private static AppConfig appConfig; private static DmaapConsumerConfiguration dmaapConsumerConfiguration; private DMaaPMessageConsumerTask messageConsumerTask; - private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; + private DMaaPConsumerReactiveHttpClient httpClientMock; private static String ftpesMessageString; private static FileData ftpesFileData; @@ -96,156 +97,161 @@ public class DMaaPMessageConsumerTaskImplTest { private static FileData sftpFileData; private static FileReadyMessage expectedSftpMessage; + /** + * Sets up data for the test. + */ @BeforeAll public static void setUp() { - //@formatter:off - dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() - .consumerGroup("OpenDCAE-c12") - .consumerId("c12") - .dmaapContentType("application/json") - .dmaapHostName("54.45.33.2") - .dmaapPortNumber(1234).dmaapProtocol("https") - .dmaapUserName("Datafile") - .dmaapUserPassword("Datafile") - .dmaapTopicName("unauthenticated.NOTIFICATION") - .timeoutMs(-1) - .messageLimit(-1) - .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath") - .keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath") - .enableDmaapCertAuth(true) + dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() // + .consumerGroup("OpenDCAE-c12") // + .consumerId("c12") // + .dmaapContentType("application/json") // + .dmaapHostName("54.45.33.2") // + .dmaapPortNumber(1234).dmaapProtocol("https") // + .dmaapUserName("Datafile") // + .dmaapUserPassword("Datafile") // + .dmaapTopicName("unauthenticated.NOTIFICATION") // + .timeoutMs(-1) // + .messageLimit(-1) // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // .build(); appConfig = mock(AppConfig.class); - AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() - .location(FTPES_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() // + .location(FTPES_LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) - .changeType(FILE_READY_CHANGE_TYPE) - .notificationFieldsVersion("1.0") - .addAdditionalField(ftpesAdditionalField) + JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) // + .changeType(FILE_READY_CHANGE_TYPE) // + .notificationFieldsVersion("1.0") // + .addAdditionalField(ftpesAdditionalField) // .build(); ftpesMessageString = ftpesJsonMessage.toString(); - MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) - .changeType(FILE_READY_CHANGE_TYPE) + MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) // + .changeType(FILE_READY_CHANGE_TYPE) // .build(); - ftpesFileData = ImmutableFileData.builder() - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .scheme(Scheme.FTPS) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + ftpesFileData = ImmutableFileData.builder() // + .name(PM_FILE_NAME) // + .location(FTPES_LOCATION) // + .scheme(Scheme.FTPS) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); List<FileData> files = new ArrayList<>(); files.add(ftpesFileData); - expectedFtpesMessage = ImmutableFileReadyMessage.builder() - .pnfName(SOURCE_NAME) - .messageMetaData(messageMetaData) - .files(files) + expectedFtpesMessage = ImmutableFileReadyMessage.builder() // + .files(files) // .build(); - AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() - .location(SFTP_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() // + .location(SFTP_LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); - JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() - .eventName(NR_RADIO_ERICSSON_EVENT_NAME) - .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) - .changeType(FILE_READY_CHANGE_TYPE) - .notificationFieldsVersion("1.0") - .addAdditionalField(sftpAdditionalField) + JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() // + .eventName(NR_RADIO_ERICSSON_EVENT_NAME) // + .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) // + .changeType(FILE_READY_CHANGE_TYPE) // + .notificationFieldsVersion("1.0") // + .addAdditionalField(sftpAdditionalField) // .build(); sftpMessageString = sftpJsonMessage.toString(); - sftpFileData = ImmutableFileData.builder() - .name(PM_FILE_NAME) - .location(SFTP_LOCATION) - .scheme(Scheme.FTPS) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + sftpFileData = ImmutableFileData.builder() // + .name(PM_FILE_NAME) // + .location(SFTP_LOCATION) // + .scheme(Scheme.FTPS) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // + .messageMetaData(messageMetaData) .build(); - ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location(FTPES_LOCATION) - .internalLocation(LOCAL_FILE_LOCATION) - .compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) - .fileFormatVersion(FILE_FORMAT_VERSION) + ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() // + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .name(PM_FILE_NAME) // + .location(FTPES_LOCATION) // + .internalLocation(LOCAL_FILE_LOCATION) // + .compression(GZIP_COMPRESSION) // + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); listOfConsumerDmaapModel.add(consumerDmaapModel); files = new ArrayList<>(); files.add(sftpFileData); - expectedSftpMessage = ImmutableFileReadyMessage.builder() - .pnfName(SOURCE_NAME) - .messageMetaData(messageMetaData) - .files(files) + expectedSftpMessage = ImmutableFileReadyMessage.builder() // + .files(files) // .build(); - //@formatter:on } @Test public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() { prepareMocksForDmaapConsumer("", null); - StepVerifier.create(messageConsumerTask.execute()).expectSubscription() - .expectError(DatafileTaskException.class).verify(); + StepVerifier.create(messageConsumerTask.execute()) // + .expectSubscription() // + .expectError(DatafileTaskException.class) // + .verify(); - verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); } @Test public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage); - StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete(); + StepVerifier.create(messageConsumerTask.execute()) // + .expectNext(expectedFtpesMessage) // + .verifyComplete(); - verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); - verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); + verifyNoMoreInteractions(httpClientMock); } @Test public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage); - StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete(); + StepVerifier.create(messageConsumerTask.execute()) // + .expectNext(expectedSftpMessage) // + .verifyComplete(); - verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse(); - verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient); + verify(httpClientMock, times(1)).getDMaaPConsumerResponse(); + verifyNoMoreInteractions(httpClientMock); } private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) { Mono<String> messageAsMono = Mono.just(message); JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class); - dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class); - when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono); + httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class); + when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono); if (!message.isEmpty()) { when(jsonMessageParserMock.getMessagesFromJson(messageAsMono)) @@ -255,9 +261,6 @@ public class DMaaPMessageConsumerTaskImplTest { .thenReturn(Flux.error(new DatafileTaskException("problemas"))); } - messageConsumerTask = - spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock)); - when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration); - doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient(); + messageConsumerTask = spy(new DMaaPMessageConsumerTask(httpClientMock, jsonMessageParserMock)); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index d612d17c..fe867738 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -16,8 +16,9 @@ package org.onap.dcaegen2.collectors.datafile.tasks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -25,19 +26,36 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; + +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; import org.springframework.http.HttpStatus; -import reactor.core.publisher.Mono; +import org.springframework.web.util.DefaultUriBuilderFactory; +import org.springframework.web.util.UriBuilder; + import reactor.test.StepVerifier; /** @@ -52,31 +70,39 @@ class DataRouterPublisherTest { private static final String START_EPOCH_MICROSEC = "8745745764578"; private static final String TIME_ZONE_OFFSET = "UTC+05:00"; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME; + private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + PM_FILE_NAME; + + private static final String COMPRESSION = "gzip"; + private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; + private static final String FILE_FORMAT_VERSION = "V10"; + private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META"; + + private static final String HOST = "54.45.33.2"; + private static final String HTTPS_SCHEME = "https"; + private static final int PORT = 1234; + private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; + private static final String PUBLISH_TOPIC = "publish"; + private static final String FEED_ID = "1"; + private static final String FILE_CONTENT = "Just a string."; private static ConsumerDmaapModel consumerDmaapModel; - private static DataRouterPublisher dmaapPublisherTask; - private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient; + private static DmaapProducerReactiveHttpClient httpClientMock; private static AppConfig appConfig; - private static DmaapPublisherConfiguration dmaapPublisherConfiguration; + private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class); + private final Map<String, String> contextMap = new HashMap<>(); + private static DataRouterPublisher publisherTaskUnderTestSpy; + /** + * Sets up data for tests. + */ @BeforeAll public static void setUp() { + when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST); + when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME); + when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT); - dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapContentType("application/json") // - .dmaapHostName("54.45.33.2") // - .dmaapPortNumber(1234) // - .dmaapProtocol("https") // - .dmaapUserName("DFC") // - .dmaapUserPassword("DFC") // - .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") // - .trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // - .enableDmaapCertAuth(true) // - .build(); // - consumerDmaapModel = ImmutableConsumerDmaapModel.builder() + consumerDmaapModel = ImmutableConsumerDmaapModel.builder() // .productName(PRODUCT_NAME) // .vendorName(VENDOR_NAME) // .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // @@ -84,61 +110,144 @@ class DataRouterPublisherTest { .startEpochMicrosec(START_EPOCH_MICROSEC) // .timeZoneOffset(TIME_ZONE_OFFSET) // .name(PM_FILE_NAME) // - .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) // - .internalLocation("target/" + PM_FILE_NAME) // + .location(FTPES_ADDRESS) // + .internalLocation(Paths.get("target/" + LOCAL_FILE_NAME)) // .compression("gzip") // - .fileFormatType("org.3GPP.32.435#measCollec") // - .fileFormatVersion("V10") // + .fileFormatType(FILE_FORMAT_TYPE) // + .fileFormatVersion(FILE_FORMAT_VERSION) // .build(); // appConfig = mock(AppConfig.class); - - doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); + publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig)); } @Test - public void whenPassedObjectFits_ReturnsCorrectStatus() { - prepareMocksForTests(Mono.just(HttpStatus.OK)); - - Map<String, String> contextMap = new HashMap<>(); - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) - .expectNext(consumerDmaapModel).verifyComplete(); - - verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any(), eq(contextMap)); - verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception { + prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value())); + StepVerifier + .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) + .expectNext(consumerDmaapModel) // + .verifyComplete(); + + ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class); + verify(httpClientMock).getBaseUri(); + verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class)); + verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any()); + verifyNoMoreInteractions(httpClientMock); + + HttpPut actualPut = (HttpPut) requestCaptor.getValue(); + URI actualUri = actualPut.getURI(); + assertEquals(HTTPS_SCHEME, actualUri.getScheme()); + assertEquals(HOST, actualUri.getHost()); + assertEquals(PORT, actualUri.getPort()); + Path actualPath = Paths.get(actualUri.getPath()); + assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString())); + assertTrue(FEED_ID.equals(actualPath.getName(1).toString())); + assertTrue(LOCAL_FILE_NAME.equals(actualPath.getName(2).toString())); + + Header[] contentHeaders = actualPut.getHeaders("content-type"); + assertEquals(APPLICATION_OCTET_STREAM_CONTENT_TYPE, contentHeaders[0].getValue()); + + Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META); + Map<String, String> metaHash = getMetaDataAsMap(metaHeaders); + assertTrue(10 == metaHash.size()); + assertEquals(PRODUCT_NAME, metaHash.get("productName")); + assertEquals(VENDOR_NAME, metaHash.get("vendorName")); + assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec")); + assertEquals(SOURCE_NAME, metaHash.get("sourceName")); + assertEquals(START_EPOCH_MICROSEC, metaHash.get("startEpochMicrosec")); + assertEquals(TIME_ZONE_OFFSET, metaHash.get("timeZoneOffset")); + assertEquals(COMPRESSION, metaHash.get("compression")); + assertEquals(FTPES_ADDRESS, metaHash.get("location")); + assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType")); + assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion")); } @Test - public void whenPassedObjectFits_firstFailsThenSucceeds() { - prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK)); + void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception { + prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value()); - Map<String, String> contextMap = new HashMap<>(); - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) - .expectNext(consumerDmaapModel).verifyComplete(); - - verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap)); - verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + StepVerifier + .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), contextMap)) + .expectNext(consumerDmaapModel) // + .verifyComplete(); } @Test - public void whenPassedObjectFits_firstFailsThenFails() { - prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY)); - - Map<String, String> contextMap = new HashMap<>(); - StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) - .expectErrorMessage("Retries exhausted: 1/1").verify(); + public void whenPassedObjectFits_firstFailsThenSucceeds() throws Exception { + prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()), + Integer.valueOf(HttpStatus.OK.value())); + + StepVerifier + .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) + .expectNext(consumerDmaapModel) // + .verifyComplete(); + + verify(httpClientMock, times(2)).getBaseUri(); + verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class)); + verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()); + verifyNoMoreInteractions(httpClientMock); + } - verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap)); - verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient); + @Test + public void whenPassedObjectFits_firstFailsThenFails() throws Exception { + prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()), + Integer.valueOf((HttpStatus.BAD_GATEWAY.value()))); + + StepVerifier + .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap)) + .expectErrorMessage("Retries exhausted: 1/1") // + .verify(); + + verify(httpClientMock, times(2)).getBaseUri(); + verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class)); + verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()); + verifyNoMoreInteractions(httpClientMock); } @SafeVarargs - final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) { - dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); - when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any(), any())).thenReturn(firstResponse, - nextHttpResponses); - - dmaapPublisherTask = spy(new DataRouterPublisher(appConfig)); - when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); - doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); + final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses) + throws Exception { + httpClientMock = mock(DmaapProducerReactiveHttpClient.class); + when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock); + doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(); + doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(); + + UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT); + when(httpClientMock.getBaseUri()).thenReturn(uriBuilder); + + HttpResponse httpResponseMock = mock(HttpResponse.class); + if (exception == null) { + when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any())) + .thenReturn(httpResponseMock); + } else { + when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any())) + .thenThrow(exception).thenReturn(httpResponseMock); + } + StatusLine statusLineMock = mock(StatusLine.class); + when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock); + when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses); + + InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes()); + doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", LOCAL_FILE_NAME)); + } + + private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) { + Map<String, String> metaHash = new HashMap<>(); + String actualMetaData = metaHeaders[0].getValue(); + actualMetaData = actualMetaData.substring(1, actualMetaData.length() - 1); + actualMetaData = actualMetaData.replace("\"", ""); + String[] commaSplitedMetaData = actualMetaData.split(","); + for (int i = 0; i < commaSplitedMetaData.length; i++) { + String[] keyValuePair = commaSplitedMetaData[i].split(":"); + if (keyValuePair.length > 2) { + List<String> arrayKeyValuePair = new ArrayList<>(keyValuePair.length); + for (int j = 1; j < keyValuePair.length; j++) { + arrayKeyValuePair.add(keyValuePair[j]); + } + keyValuePair[1] = String.join(":", arrayKeyValuePair); + } + metaHash.put(keyValuePair[0], keyValuePair[1]); + } + return metaHash; } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index c266d50e..6e17f27b 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -1,4 +1,4 @@ -/* +/*- * ============LICENSE_START====================================================================== * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== @@ -67,7 +67,7 @@ public class FileCollectorTest { private static final int PORT_22 = 22; private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME; - private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME); + private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SOURCE_NAME, PM_FILE_NAME); private static final String USER = "usr"; private static final String PWD = "pwd"; private static final String FTPES_LOCATION = @@ -93,6 +93,7 @@ public class FileCollectorTest { private FtpsClient ftpsClientMock = mock(FtpsClient.class); private SftpClient sftpClientMock = mock(SftpClient.class); + private final Map<String, String> contextMap = new HashMap<>(); private MessageMetaData createMessageMetaData() { @@ -119,6 +120,7 @@ public class FileCollectorTest { .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) .scheme(scheme) + .messageMetaData(createMessageMetaData()) .build(); // @formatter:on } @@ -134,7 +136,7 @@ public class FileCollectorTest { .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) .location(location) - .internalLocation(LOCAL_FILE_LOCATION.toString()) + .internalLocation(LOCAL_FILE_LOCATION) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) @@ -160,10 +162,11 @@ public class FileCollectorTest { ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); - Map<String, String> contextMap = new HashMap<>(); - StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) + StepVerifier.create( + collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap)) .expectNext(expectedConsumerDmaapModel).verifyComplete(); + verify(ftpsClientMock, times(1)).open(); verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verify(ftpsClientMock, times(1)).close(); @@ -175,11 +178,13 @@ public class FileCollectorTest { FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any()); + FileData fileData = createFileData(SFTP_LOCATION_NO_PORT, Scheme.SFTP); ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT); - Map<String, String> contextMap = new HashMap<>(); - StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) + StepVerifier + .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), + contextMap)) .expectNext(expectedConsumerDmaapModel) // .verifyComplete(); @@ -187,10 +192,13 @@ public class FileCollectorTest { fileData = createFileData(SFTP_LOCATION, Scheme.SFTP); expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION); - StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) + StepVerifier + .create(collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), + contextMap)) .expectNext(expectedConsumerDmaapModel) // .verifyComplete(); + verify(sftpClientMock, times(2)).open(); verify(sftpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verify(sftpClientMock, times(2)).close(); verifyNoMoreInteractions(sftpClientMock); @@ -205,8 +213,8 @@ public class FileCollectorTest { doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - Map<String, String> contextMap = new HashMap<>(); - StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) + StepVerifier.create( + collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap)) .expectErrorMessage("Retries exhausted: 3/3").verify(); verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); @@ -222,11 +230,11 @@ public class FileCollectorTest { ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS); - Map<String, String> contextMap = new HashMap<>(); - StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap)) + + StepVerifier.create( + collectorUndetTest.execute(fileData, 3, Duration.ofSeconds(0), contextMap)) .expectNext(expectedConsumerDmaapModel).verifyComplete(); verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); } - } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java new file mode 100644 index 00000000..3e3c2ed6 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java @@ -0,0 +1,175 @@ +/*- +* ============LICENSE_START======================================================= +* Copyright (C) 2019 Nordix Foundation. +* ================================================================================ +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* SPDX-License-Identifier: Apache-2.0 +* ============LICENSE_END========================================================= +*/ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import static org.junit.Assert.assertFalse; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpUriRequest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; +import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.springframework.web.util.DefaultUriBuilderFactory; +import org.springframework.web.util.UriBuilder; + +/** + * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a> + * + */ +public class PublishedCheckerTest { + private static final String EMPTY_CONTENT = "[]"; + private static final String FEEDLOG_TOPIC = "feedlog"; + private static final String FEED_ID = "1"; + private static final String HTTPS_SCHEME = "https"; + private static final String HOST = "54.45.33.2"; + private static final int PORT = 1234; + private static final String SOURCE_NAME = "oteNB5309"; + private static final String FILE_NAME = "A20161224.1030-1045.bin.gz"; + private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME; + + private static final Map<String, String> CONTEXT_MAP = new HashMap<>(); + + private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class); + private static AppConfig appConfigMock; + private DmaapProducerReactiveHttpClient httpClientMock = mock(DmaapProducerReactiveHttpClient.class); + + private PublishedChecker publishedCheckerUnderTestSpy; + + /** + * Sets up data for the tests. + */ + @BeforeAll + public static void setUp() { + when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST); + when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME); + when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT); + + appConfigMock = mock(AppConfig.class); + when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock); + } + + @Test + public void executeWhenNotPublished_returnsFalse() throws Exception { + prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null); + + boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP); + + assertFalse(isPublished); + + ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class); + verify(httpClientMock).getBaseUri(); + verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class)); + verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), anyInt(), any()); + verifyNoMoreInteractions(httpClientMock); + + HttpUriRequest getRequest = requestCaptor.getValue(); + assertTrue(getRequest instanceof HttpGet); + URI actualUri = getRequest.getURI(); + assertEquals(HTTPS_SCHEME, actualUri.getScheme()); + assertEquals(HOST, actualUri.getHost()); + assertEquals(PORT, actualUri.getPort()); + Path actualPath = Paths.get(actualUri.getPath()); + assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString())); + assertTrue(FEED_ID.equals(actualPath.getName(1).toString())); + String actualQuery = actualUri.getQuery(); + assertTrue(actualQuery.contains("type=pub")); + assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME)); + } + + @Test + public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception { + prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null); + + boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP); + + assertFalse(isPublished); + } + + @Test + public void executeWhenPublished_returnsTrue() throws Exception { + prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null); + + boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP); + + assertTrue(isPublished); + } + + @Test + public void executeWhenErrorInDataRouter_returnsFalse() throws Exception { + prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException("")); + + boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP); + + assertFalse(isPublished); + } + + final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception { + publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock)); + + doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration(); + doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient(); + + UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT); + when(httpClientMock.getBaseUri()).thenReturn(uriBuilder); + + HttpResponse httpResponseMock = mock(HttpResponse.class); + if (exception == null) { + when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any())) + .thenReturn(httpResponseMock); + } else { + when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any())) + .thenThrow(exception); + } + HttpEntity httpEntityMock = mock(HttpEntity.class); + StatusLine statusLineMock = mock(StatusLine.class); + when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock); + when(statusLineMock.getStatusCode()).thenReturn(responseCode); + when(httpResponseMock.getEntity()).thenReturn(httpEntityMock); + InputStream stream = new ByteArrayInputStream(content.getBytes()); + when(httpEntityMock.getContent()).thenReturn(stream); + } +} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 8c4b3891..f6beae02 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -19,6 +19,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -26,9 +27,14 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.nio.file.Paths; import java.time.Duration; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -43,6 +49,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -56,8 +63,10 @@ public class ScheduledTasksTest { private int uniqueValue = 0; private DMaaPMessageConsumerTask consumerMock; + private PublishedChecker publishedCheckerMock; private FileCollector fileCollectorMock; private DataRouterPublisher dataRouterMock; + private Map<String, String> contextMap = new HashMap<String, String>(); @BeforeEach private void setUp() { @@ -75,13 +84,15 @@ public class ScheduledTasksTest { .keyStorePasswordPath("keyStorePasswordPath") // .enableDmaapCertAuth(true) // .build(); // + doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); consumerMock = mock(DMaaPMessageConsumerTask.class); + publishedCheckerMock = mock(PublishedChecker.class); fileCollectorMock = mock(FileCollector.class); dataRouterMock = mock(DataRouterPublisher.class); - doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); doReturn(consumerMock).when(testedObject).createConsumerTask(); + doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker(); doReturn(fileCollectorMock).when(testedObject).createFileCollector(); doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher(); } @@ -107,6 +118,7 @@ public class ScheduledTasksTest { .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) // .scheme(Scheme.FTPS) // .compression("") // + .messageMetaData(messageMetaData()) .build(); } @@ -122,9 +134,7 @@ public class ScheduledTasksTest { } private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) { - MessageMetaData md = messageMetaData(); - return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md) - .files(files(numberOfFiles, uniqueNames)).build(); + return ImmutableFileReadyMessage.builder().files(files(numberOfFiles, uniqueNames)).build(); } private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) { @@ -146,7 +156,7 @@ public class ScheduledTasksTest { .timeZoneOffset("") // .name("") // .location("") // - .internalLocation("internalLocation") // + .internalLocation(Paths.get("internalLocation")) // .compression("") // .fileFormatType("") // .fileFormatVersion("") // @@ -174,18 +184,20 @@ public class ScheduledTasksTest { Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true); doReturn(fileReadyMessages).when(consumerMock).execute(); + doReturn(false).when(publishedCheckerMock).execute(anyString(), any()); + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); - doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull()); doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // .expectNextCount(noOfFiles) // .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); @@ -197,25 +209,27 @@ public class ScheduledTasksTest { Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files doReturn(fileReadyMessages).when(consumerMock).execute(); + doReturn(false).when(publishedCheckerMock).execute(anyString(), any()); + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); Mono<Object> error = Mono.error(new Exception("problem")); // First file collect will fail, 3 will succeed doReturn(error, collectedFile, collectedFile, collectedFile) // .when(fileCollectorMock) // - .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class), any()); + .execute(any(FileData.class), anyLong(), any(Duration.class), notNull()); doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // .expectNextCount(3) // .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); @@ -228,8 +242,10 @@ public class ScheduledTasksTest { Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files doReturn(fileReadyMessages).when(consumerMock).execute(); + doReturn(false).when(publishedCheckerMock).execute(anyString(), any()); + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); - doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull()); Mono<Object> error = Mono.error(new Exception("problem")); // One publish will fail, the rest will succeed @@ -237,14 +253,14 @@ public class ScheduledTasksTest { .when(dataRouterMock) // .execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // .expectNextCount(3) // 3 completed files .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(fileCollectorMock, times(4)).execute(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); @@ -260,18 +276,20 @@ public class ScheduledTasksTest { Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false); doReturn(fileReadyMessages).when(consumerMock).execute(); + doReturn(false).when(publishedCheckerMock).execute(anyString(), any()); + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); - doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), anyLong(), notNull(), notNull()); doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any()); - StepVerifier.create(testedObject.createMainTask(any())).expectSubscription() // + StepVerifier.create(testedObject.createMainTask(contextMap)).expectSubscription() // .expectNextCount(1) // 99 is skipped .expectComplete() // .verify(); // assertEquals(0, testedObject.getCurrentNumberOfTasks()); verify(consumerMock, times(1)).execute(); - verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull(), any()); + verify(fileCollectorMock, times(1)).execute(notNull(), anyLong(), notNull(), notNull()); verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull(), any()); verifyNoMoreInteractions(dataRouterMock); verifyNoMoreInteractions(fileCollectorMock); |