diff options
Diffstat (limited to 'datafile-app-server/src')
17 files changed, 764 insertions, 556 deletions
diff --git a/datafile-app-server/src/main/docker/Dockerfile b/datafile-app-server/src/main/docker/Dockerfile new file mode 100644 index 00000000..6ab30cc2 --- /dev/null +++ b/datafile-app-server/src/main/docker/Dockerfile @@ -0,0 +1,41 @@ +# +# ============LICENSE_START======================================================= +# Copyright (C) 2019 Nordix Foundation. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# ============LICENSE_END========================================================= +# +FROM openjdk:8-jre-alpine + +WORKDIR /opt/app/datafile +RUN mkdir -p /var/log/ONAP + +ADD /target/datafile-app-server.jar /opt/app/datafile/ + +ADD /config/application.yaml /opt/app/datafile/config/ +ADD /config/cacerts /opt/app/datafile/config/ +ADD /config/datafile_endpoints.json /opt/app/datafile/config/ +ADD /config/ftpKey.jks /opt/app/datafile/config/ +ADD /config/keystore /opt/app/datafile/config/ + +EXPOSE 8100 8433 + +RUN addgroup -S onap && adduser -S datafile -G onap +RUN chown -R datafile:onap /opt/app/datafile +RUN chown -R datafile:onap /var/log/ONAP + +USER datafile + +ENTRYPOINT ["/usr/bin/java", "-jar", "/opt/app/datafile/datafile-app-server.jar"]
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index 5bbacb14..40de33dd 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -20,14 +20,21 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; +import java.io.*; +import java.util.ServiceLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.stereotype.Component; - -import java.util.Optional; -import java.util.function.Predicate; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; +import com.google.gson.TypeAdapterFactory; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -35,199 +42,95 @@ import java.util.function.Predicate; */ @Component -@Configuration -public class AppConfig extends DatafileAppConfig { - - private static Predicate<String> isEmpty = String::isEmpty; - @Value("${dmaap.dmaapConsumerConfiguration.dmaapHostName:}") - public String consumerDmaapHostName; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapPortNumber:}") - public Integer consumerDmaapPortNumber; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapTopicName:}") - public String consumerDmaapTopicName; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapProtocol:}") - public String consumerDmaapProtocol; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapUserName:}") - public String consumerDmaapUserName; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapUserPassword:}") - public String consumerDmaapUserPassword; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapContentType:}") - public String consumerDmaapContentType; - - @Value("${dmaap.dmaapConsumerConfiguration.consumerId:}") - public String consumerId; - - @Value("${dmaap.dmaapConsumerConfiguration.consumerGroup:}") - public String consumerGroup; - - @Value("${dmaap.dmaapConsumerConfiguration.timeoutMs:}") - public Integer consumerTimeoutMs; - - @Value("${dmaap.dmaapConsumerConfiguration.message-limit:}") - public Integer consumerMessageLimit; - - @Value("${dmaap.dmaapProducerConfiguration.dmaapHostName:}") - public String producerDmaapHostName; - - @Value("${dmaap.dmaapProducerConfiguration.dmaapPortNumber:}") - public Integer producerDmaapPortNumber; - - @Value("${dmaap.dmaapProducerConfiguration.dmaapTopicName:}") - public String producerDmaapTopicName; +@EnableConfigurationProperties +@ConfigurationProperties("app") +public class AppConfig { - @Value("${dmaap.dmaapProducerConfiguration.dmaapProtocol:}") - public String producerDmaapProtocol; + private static final String CONFIG = "configs"; + private static final String DMAAP = "dmaap"; + private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; + private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; + private static final String FTP = "ftp"; + private static final String FTPES_CONFIGURATION = "ftpesConfiguration"; + private static final String SECURITY = "security"; + private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); - @Value("${dmaap.dmaapProducerConfiguration.dmaapUserName:}") - public String producerDmaapUserName; + DmaapConsumerConfiguration dmaapConsumerConfiguration; - @Value("${dmaap.dmaapProducerConfiguration.dmaapUserPassword:}") - public String producerDmaapUserPassword; + DmaapPublisherConfiguration dmaapPublisherConfiguration; - @Value("${dmaap.dmaapProducerConfiguration.dmaapContentType:}") - public String producerDmaapContentType; + FtpesConfig ftpesConfig; - @Value("${ftp.ftpesConfiguration.keyCert:}") - public String keyCert; + @NotEmpty + private String filepath; - @Value("${ftp.ftpesConfiguration.keyPassword:}") - public String keyPassword; - - @Value("${ftp.ftpesConfiguration.trustedCA:}") - public String trustedCA; + public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + return dmaapConsumerConfiguration; + } - @Value("${ftp.ftpesConfiguration.trustedCAPassword:}") - public String trustedCAPassword; + public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + return dmaapPublisherConfiguration; + } - @Value("${security.trustStorePath:}") - public String trustStorePath; + public FtpesConfig getFtpesConfiguration() { + return ftpesConfig; + } - @Value("${security.trustStorePasswordPath:}") - public String trustStorePasswordPath; + public void initFileStreamReader() { + + GsonBuilder gsonBuilder = new GsonBuilder(); + ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); + JsonParser parser = new JsonParser(); + JsonObject jsonObject; + try (InputStream inputStream = getInputStream(filepath)) { + JsonElement rootElement = getJsonElement(parser, inputStream); + if (rootElement.isJsonObject()) { + jsonObject = rootElement.getAsJsonObject(); + ftpesConfig = deserializeType(gsonBuilder, + jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION), + FtpesConfig.class); + dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( + jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER), + rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), + DmaapConsumerConfiguration.class); + + dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( + jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER), + rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), + DmaapPublisherConfiguration.class); + } + } catch (IOException e) { + logger.error("Problem with file loading, file: {}", filepath, e); + } catch (JsonSyntaxException e) { + logger.error("Problem with Json deserialization", e); + } + } - @Value("${security.keyStorePath:}") - public String keyStorePath; + JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { + return parser.parse(new InputStreamReader(inputStream)); + } - @Value("${security.keyStorePasswordPath:}") - public String keyStorePasswordPath; + private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, + @NotNull Class<T> type) { + return gsonBuilder.create().fromJson(jsonObject, type); + } - @Value("${security.enableDmaapCertAuth:}") - public Boolean enableDmaapCertAuth; + InputStream getInputStream(@NotNull String filepath) throws IOException { + return new BufferedInputStream(new FileInputStream(filepath)); + } - @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return new ImmutableDmaapConsumerConfiguration.Builder() - .dmaapUserPassword( - Optional.ofNullable(consumerDmaapUserPassword).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapUserPassword())) - .dmaapUserName( - Optional.ofNullable(consumerDmaapUserName).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapUserName())) - .dmaapHostName( - Optional.ofNullable(consumerDmaapHostName).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapHostName())) - .dmaapPortNumber( - Optional.ofNullable(consumerDmaapPortNumber).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapConsumerConfiguration.dmaapPortNumber())) - .dmaapProtocol( - Optional.ofNullable(consumerDmaapProtocol).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapProtocol())) - .dmaapContentType( - Optional.ofNullable(consumerDmaapContentType).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapContentType())) - .dmaapTopicName( - Optional.ofNullable(consumerDmaapTopicName).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapTopicName())) - .messageLimit( - Optional.ofNullable(consumerMessageLimit).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapConsumerConfiguration.messageLimit())) - .timeoutMs(Optional.ofNullable(consumerTimeoutMs).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapConsumerConfiguration.timeoutMs())) - .consumerGroup(Optional.ofNullable(consumerGroup).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.consumerGroup())) - .consumerId(Optional.ofNullable(consumerId).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.consumerId())) - .trustStorePath( - Optional.ofNullable(trustStorePath).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.trustStorePath())) - .trustStorePasswordPath( - Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.trustStorePasswordPath())) - .keyStorePath( - Optional.ofNullable(keyStorePath).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.keyStorePath())) - .keyStorePasswordPath( - Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.keyStorePasswordPath())) - .enableDmaapCertAuth( - Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapConsumerConfiguration.enableDmaapCertAuth())) - .build(); + String getFilepath() { + return this.filepath; } - @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapContentType( - Optional.ofNullable(producerDmaapContentType).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapContentType())) - .dmaapHostName( - Optional.ofNullable(producerDmaapHostName).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapHostName())) - .dmaapPortNumber( - Optional.ofNullable(producerDmaapPortNumber).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapPublisherConfiguration.dmaapPortNumber())) - .dmaapProtocol( - Optional.ofNullable(producerDmaapProtocol).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapProtocol())) - .dmaapTopicName( - Optional.ofNullable(producerDmaapTopicName).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapTopicName())) - .dmaapUserName( - Optional.ofNullable(producerDmaapUserName).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapUserName())) - .dmaapUserPassword( - Optional.ofNullable(producerDmaapUserPassword).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapUserPassword())) - .trustStorePath( - Optional.ofNullable(trustStorePath).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.trustStorePath())) - .trustStorePasswordPath( - Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.trustStorePasswordPath())) - .keyStorePath( - Optional.ofNullable(keyStorePath).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.keyStorePath())) - .keyStorePasswordPath( - Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.keyStorePasswordPath())) - .enableDmaapCertAuth( - Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapPublisherConfiguration.enableDmaapCertAuth())) - .build(); + public void setFilepath(String filepath) { + this.filepath = filepath; } - @Override - public FtpesConfig getFtpesConfiguration() { - return new ImmutableFtpesConfig.Builder() - .keyCert( - Optional.ofNullable(keyCert).filter(isEmpty.negate()) - .orElse(ftpesConfig.keyCert())) - .keyPassword( - Optional.ofNullable(keyPassword).filter(isEmpty.negate()) - .orElse(ftpesConfig.keyPassword())) - .trustedCA( - Optional.ofNullable(trustedCA).filter(isEmpty.negate()) - .orElse(ftpesConfig.trustedCA())) - .trustedCAPassword( - Optional.ofNullable(trustedCAPassword).filter(isEmpty.negate()) - .orElse(ftpesConfig.trustedCAPassword())) - .build(); + private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) { + source.entrySet() + .forEach(entry -> target.add(entry.getKey(), entry.getValue())); + return target; } + } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java deleted file mode 100644 index 7fe2561c..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -public interface Config { - - DmaapConsumerConfiguration getDmaapConsumerConfiguration(); - - DmaapPublisherConfiguration getDmaapPublisherConfiguration(); - - FtpesConfig getFtpesConfiguration(); - - void initFileStreamReader(); - -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java deleted file mode 100644 index 59bb259d..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; -import com.google.gson.TypeAdapterFactory; - -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ServiceLoader; - -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - */ -@Configuration -@EnableConfigurationProperties -@ConfigurationProperties("app") -public abstract class DatafileAppConfig implements Config { - - private static final String CONFIG = "configs"; - private static final String DMAAP = "dmaap"; - private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; - private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; - private static final String FTP = "ftp"; - private static final String FTPES_CONFIGURATION = "ftpesConfiguration"; - private static final String SECURITY = "security"; - private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class); - - DmaapConsumerConfiguration dmaapConsumerConfiguration; - - DmaapPublisherConfiguration dmaapPublisherConfiguration; - - FtpesConfig ftpesConfig; - - @NotEmpty - private String filepath; - - - @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return dmaapConsumerConfiguration; - } - - @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return dmaapPublisherConfiguration; - } - - @Override - public FtpesConfig getFtpesConfiguration() { - return ftpesConfig; - } - - @Override - public void initFileStreamReader() { - - GsonBuilder gsonBuilder = new GsonBuilder(); - ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - JsonParser parser = new JsonParser(); - JsonObject jsonObject; - try (InputStream inputStream = getInputStream(filepath)) { - JsonElement rootElement = getJsonElement(parser, inputStream); - if (rootElement.isJsonObject()) { - jsonObject = rootElement.getAsJsonObject(); - ftpesConfig = deserializeType(gsonBuilder, - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION), - FtpesConfig.class); - dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapConsumerConfiguration.class); - - dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapPublisherConfiguration.class); - } - } catch (IOException e) { - logger.error("Problem with file loading, file: {}", filepath, e); - } catch (JsonSyntaxException e) { - logger.error("Problem with Json deserialization", e); - } - } - - JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { - return parser.parse(new InputStreamReader(inputStream)); - } - - private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, - @NotNull Class<T> type) { - return gsonBuilder.create().fromJson(jsonObject, type); - } - - InputStream getInputStream(@NotNull String filepath) throws IOException { - return new BufferedInputStream(new FileInputStream(filepath)); - } - - String getFilepath() { - return this.filepath; - } - - public void setFilepath(String filepath) { - this.filepath = filepath; - } - - private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) { - source.entrySet() - .forEach(entry -> target.add(entry.getKey(), entry.getValue())); - return target; - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 478ae309..bc21f96c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -21,7 +21,9 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; + import javax.annotation.PostConstruct; + import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @@ -29,6 +31,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; + import io.swagger.annotations.ApiOperation; import reactor.core.publisher.Mono; @@ -37,10 +40,11 @@ import reactor.core.publisher.Mono; */ @Configuration @EnableScheduling -public class SchedulerConfig extends DatafileAppConfig { +public class SchedulerConfig { - private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15; - private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; + private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15); + private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5); + private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1); private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>(); private final TaskScheduler taskScheduler; @@ -77,11 +81,13 @@ public class SchedulerConfig extends DatafileAppConfig { @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { if (scheduledFutureList.isEmpty()) { - scheduledFutureList.add(taskScheduler - .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), - Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); + scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), + SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)); scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask, - Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS))); + SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)); + scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()), + SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE)); + return true; } else { return false; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 3c606deb..a8f79ea1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -123,12 +123,11 @@ public class JsonMessageParser { } private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { - return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) - ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject) - : transformMessages(monoJsonP)); + return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) + : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } - private Flux<FileReadyMessage> transformMessages(JsonObject message) { + private Mono<FileReadyMessage> transformMessages(JsonObject message) { Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); @@ -138,22 +137,22 @@ public class JsonMessageParser { if (!allFileDataFromJson.isEmpty()) { MessageMetaData messageMetaData = optionalMessageMetaData.get(); // @formatter:off - return Flux.just(ImmutableFileReadyMessage.builder() + return Mono.just(ImmutableFileReadyMessage.builder() .pnfName(messageMetaData.sourceName()) .messageMetaData(messageMetaData) .files(allFileDataFromJson) .build()); // @formatter:on } else { - return Flux.empty(); + return Mono.empty(); } } logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); - return Flux.empty(); + return Mono.empty(); } logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); - return Flux.empty(); + return Mono.empty(); } private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java new file mode 100644 index 00000000..2cb84112 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java @@ -0,0 +1,59 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ +package org.onap.dcaegen2.collectors.datafile.service; + +import java.nio.file.Path; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * A cache of all files that already has been published. Key is the local file path and the value is + * a time stamp, when the key was last used. + */ +public class PublishedFileCache { + private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>()); + + public Instant put(Path path) { + return publishedFiles.put(path, Instant.now()); + } + + public void remove(Path localFileName) { + publishedFiles.remove(localFileName); + } + + public void purge(Instant now) { + for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) { + Map.Entry<Path, Instant> pair = it.next(); + if (isCachedPublishedFileOutdated(now, pair.getValue())) { + it.remove(); + } + } + } + + public int size() { + return publishedFiles.size(); + } + + private boolean isCachedPublishedFileOutdated(Instant now, Instant then) { + final int timeToKeepInfoInSeconds = 60 * 60 * 24; + return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds; + } + + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java index c41dce5b..f6daf733 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java @@ -22,7 +22,6 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; @@ -31,9 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.reactive.function.client.WebClient; - -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> @@ -41,7 +39,7 @@ import reactor.core.publisher.Mono; public class DMaaPMessageConsumerTask { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class); - private Config datafileAppConfig; + private AppConfig datafileAppConfig; private JsonMessageParser jsonMessageParser; private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index b65ddd63..4c0dcce5 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -19,7 +19,6 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.time.Duration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient; @@ -28,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 @@ -37,7 +36,7 @@ import reactor.core.publisher.Flux; public class DataRouterPublisher { private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); - private final Config datafileAppConfig; + private final AppConfig datafileAppConfig; public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; @@ -51,27 +50,27 @@ public class DataRouterPublisher { * @param firstBackoffTimeout the time to delay the first retry * @return the HTTP response status as a string */ - public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { logger.trace("Method called with arg {}", model); DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); //@formatter:off - return Flux.just(model) - .cache(1) + return Mono.just(model) + .cache() .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) .retryBackoff(numRetries, firstBackoff); //@formatter:on } - private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { + private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); - return Flux.just(model); + return Mono.just(model); } else { - logger.warn("Publish to DR unsuccessful, response code: " + response); - return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response)); + logger.warn("Publish to DR unsuccessful, response code: {}", response); + return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response)); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index db18ac2a..0b647bf5 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -20,7 +20,6 @@ import java.nio.file.Path; import java.time.Duration; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.configuration.Config; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient; @@ -41,7 +40,7 @@ import reactor.core.publisher.Mono; public class FileCollector { private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); - private Config datafileAppConfig; + private AppConfig datafileAppConfig; private final FtpsClient ftpsClient; private final SftpClient sftpClient; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index f22c7bf9..783c699c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -20,11 +20,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -34,6 +32,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -41,22 +40,23 @@ import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; /** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 - * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> + * This implements the main flow of the data file collector. Fetch file ready events from the + * message router, fetch new files from the PNF publish these in the data router. */ @Component public class ScheduledTasks { private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200; + private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10; - /** Data needed for fetching of files from one PNF */ + /** Data needed for fetching of one file */ private class FileCollectionData { final FileData fileData; - final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES - // event + final FileCollector collectorTask; final MessageMetaData metaData; FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) { @@ -68,15 +68,15 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private final AppConfig applicationConfiguration; - private final AtomicInteger taskCounter = new AtomicInteger(); - private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>()); + private final AtomicInteger currentNumberOfTasks = new AtomicInteger(); + private final Scheduler scheduler = + Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS); + PublishedFileCache alreadyPublishedFiles = new PublishedFileCache(); /** * Constructor for task registration in Datafile Workflow. * * @param applicationConfiguration - application configuration - * @param xnfCollectorTask - second task - * @param dmaapPublisherTask - third task */ @Autowired public ScheduledTasks(AppConfig applicationConfiguration) { @@ -84,52 +84,67 @@ public class ScheduledTasks { } /** - * Main function for scheduling Datafile Workflow. + * Main function for scheduling for the file collection Workflow. */ public void scheduleMainDatafileEventTask() { logger.trace("Execution of tasks was registered"); applicationConfiguration.initFileStreamReader(); - //@formatter:off - consumeMessagesFromDmaap() - .parallel() // Each FileReadyMessage in a separate thread - .runOn(Schedulers.parallel()) - .flatMap(this::createFileCollectionTask) - .filter(this::shouldBePublished) - .doOnNext(fileData -> taskCounter.incrementAndGet()) - .flatMap(this::collectFileFromXnf) - .flatMap(this::publishToDataRouter) - .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation()))) - .doOnNext(model -> taskCounter.decrementAndGet()) - .sequential() - .subscribe(this::onSuccess, this::onError, this::onComplete); - //@formatter:on + createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete); + } + + Flux<ConsumerDmaapModel> createMainTask() { + return fetchMoreFileReadyMessages() // + .parallel(getParallelism()) // Each FileReadyMessage in a separate thread + .runOn(scheduler) // + .flatMap(this::createFileCollectionTask) // + .filter(this::shouldBePublished) // + .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) // + .flatMap(this::collectFileFromXnf) // + .flatMap(this::publishToDataRouter) // + .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) // + .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) // + .sequential(); + } + + /** + * called in regular intervals to remove out-dated cached information + */ + public void purgeCachedInformation(Instant now) { + alreadyPublishedFiles.purge(now); } private void onComplete() { logger.info("Datafile tasks have been completed"); } - private void onSuccess(Path localFile) { - logger.info("Datafile consumed tasks." + localFile); + private void onSuccess(ConsumerDmaapModel model) { + logger.info("Datafile consumed tasks {}", model.getInternalLocation()); } private void onError(Throwable throwable) { logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable); } + private int getParallelism() { + if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) { + return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks(); + } else { + return 1; // We need at least one rail/thread + } + } + private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) { List<FileCollectionData> fileCollects = new ArrayList<>(); for (FileData fileData : availableFiles.files()) { - FileCollector task = new FileCollector(applicationConfiguration, - new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData())); - fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData())); + fileCollects.add( + new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData())); } return Flux.fromIterable(fileCollects); } private boolean shouldBePublished(FileCollectionData task) { - return alreadyPublishedFiles.add(task.fileData.getLocalFileName()); + return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null; } private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) { @@ -138,48 +153,49 @@ public class ScheduledTasks { return fileCollect.collectorTask .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception)); + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData)); } - private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) { - logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage()); - deleteFile(fileData.getLocalFileName()); - alreadyPublishedFiles.remove(fileData.getLocalFileName()); - taskCounter.decrementAndGet(); + private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) { + Path localFileName = fileData.getLocalFileName(); + logger.error("File fetching failed: {}", localFileName); + deleteFile(localFileName); + alreadyPublishedFiles.remove(localFileName); + currentNumberOfTasks.decrementAndGet(); return Mono.empty(); } - private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { + private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { final long maxNumberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); - DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration); + DataRouterPublisher publisherTask = createDataRouterPublisher(); return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout) .onErrorResume(exception -> handlePublishFailure(model, exception)); - } - private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); Path internalFileName = Paths.get(model.getInternalLocation()); deleteFile(internalFileName); alreadyPublishedFiles.remove(internalFileName); - taskCounter.decrementAndGet(); - return Flux.empty(); + currentNumberOfTasks.decrementAndGet(); + return Mono.empty(); } - private Flux<FileReadyMessage> consumeMessagesFromDmaap() { - final int currentNumberOfTasks = taskCounter.get(); - logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks); - if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) { + /** + * Fetch more messages from the message router. This is done in a polling/blocking fashion. + */ + private Flux<FileReadyMessage> fetchMoreFileReadyMessages() { + logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks()); + if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) { return Flux.empty(); } - final DMaaPMessageConsumerTask messageConsumerTask = - new DMaaPMessageConsumerTask(this.applicationConfiguration); - return messageConsumerTask.execute() - .onErrorResume(exception -> handleConsumeMessageFailure(exception)); + return createConsumerTask() // + .execute() // + .onErrorResume(this::handleConsumeMessageFailure); } private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) { @@ -187,13 +203,30 @@ public class ScheduledTasks { return Flux.empty(); } - private Flux<Path> deleteFile(Path localFile) { + private void deleteFile(Path localFile) { logger.trace("Deleting file: {}", localFile); try { Files.delete(localFile); } catch (Exception e) { - logger.warn("Could not delete file: {}, {}", localFile, e); + logger.trace("Could not delete file: {}", localFile); } - return Flux.just(localFile); } + + int getCurrentNumberOfTasks() { + return currentNumberOfTasks.get(); + } + + DMaaPMessageConsumerTask createConsumerTask() { + return new DMaaPMessageConsumerTask(this.applicationConfiguration); + } + + FileCollector createFileCollector(FileData fileData) { + return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()), + new SftpClient(fileData.fileServerData())); + } + + DataRouterPublisher createDataRouterPublisher() { + return new DataRouterPublisher(applicationConfiguration); + } + } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java index 2cd854af..443ddae7 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java @@ -46,33 +46,32 @@ import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoE * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> */ @ExtendWith({MockitoExtension.class}) -class DatafileAppConfigTest { - +class AppConfigTest { + private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json"; private static final boolean CORRECT_JSON = true; private static final boolean INCORRECT_JSON = false; - private static DatafileAppConfig datafileAppConfig; - private static AppConfig appConfig; + private static AppConfig appConfigUnderTest; + private static String filePath = Objects - .requireNonNull(DatafileAppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile(); + .requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile(); @BeforeEach public void setUp() { - datafileAppConfig = spy(DatafileAppConfig.class); - appConfig = spy(new AppConfig()); + appConfigUnderTest = spy(AppConfig.class); } @Test public void whenApplicationWasStarted_FilePathIsSet() { // When - datafileAppConfig.setFilepath(filePath); + appConfigUnderTest.setFilepath(filePath); // Then - verify(datafileAppConfig, times(1)).setFilepath(anyString()); - verify(datafileAppConfig, times(0)).initFileStreamReader(); - Assertions.assertEquals(filePath, datafileAppConfig.getFilepath()); + verify(appConfigUnderTest, times(1)).setFilepath(anyString()); + verify(appConfigUnderTest, times(0)).initFileStreamReader(); + Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath()); } @Test @@ -82,23 +81,23 @@ class DatafileAppConfigTest { new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8))); // When - datafileAppConfig.setFilepath(filePath); - doReturn(inputStream).when(datafileAppConfig).getInputStream(any()); - datafileAppConfig.initFileStreamReader(); - appConfig.dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration(); - appConfig.dmaapPublisherConfiguration = datafileAppConfig.getDmaapPublisherConfiguration(); - appConfig.ftpesConfig = datafileAppConfig.getFtpesConfiguration(); + appConfigUnderTest.setFilepath(filePath); + doReturn(inputStream).when(appConfigUnderTest).getInputStream(any()); + appConfigUnderTest.initFileStreamReader(); + appConfigUnderTest.dmaapConsumerConfiguration = appConfigUnderTest.getDmaapConsumerConfiguration(); + appConfigUnderTest.dmaapPublisherConfiguration = appConfigUnderTest.getDmaapPublisherConfiguration(); + appConfigUnderTest.ftpesConfig = appConfigUnderTest.getFtpesConfiguration(); // Then - verify(datafileAppConfig, times(1)).setFilepath(anyString()); - verify(datafileAppConfig, times(1)).initFileStreamReader(); - Assertions.assertNotNull(datafileAppConfig.getDmaapConsumerConfiguration()); - Assertions.assertNotNull(datafileAppConfig.getDmaapPublisherConfiguration()); - Assertions.assertEquals(appConfig.getDmaapPublisherConfiguration(), - datafileAppConfig.getDmaapPublisherConfiguration()); - Assertions.assertEquals(appConfig.getDmaapConsumerConfiguration(), - datafileAppConfig.getDmaapConsumerConfiguration()); - Assertions.assertEquals(appConfig.getFtpesConfiguration(), datafileAppConfig.getFtpesConfiguration()); + verify(appConfigUnderTest, times(1)).setFilepath(anyString()); + verify(appConfigUnderTest, times(1)).initFileStreamReader(); + Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); + Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration()); + Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(), + appConfigUnderTest.getDmaapPublisherConfiguration()); + Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(), + appConfigUnderTest.getDmaapConsumerConfiguration()); + Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration()); } @@ -106,17 +105,17 @@ class DatafileAppConfigTest { public void whenFileIsNotExist_ThrowIoException() { // Given filePath = "/temp.json"; - datafileAppConfig.setFilepath(filePath); + appConfigUnderTest.setFilepath(filePath); // When - datafileAppConfig.initFileStreamReader(); + appConfigUnderTest.initFileStreamReader(); // Then - verify(datafileAppConfig, times(1)).setFilepath(anyString()); - verify(datafileAppConfig, times(1)).initFileStreamReader(); - Assertions.assertNull(datafileAppConfig.getDmaapConsumerConfiguration()); - Assertions.assertNull(datafileAppConfig.getDmaapPublisherConfiguration()); - Assertions.assertNull(datafileAppConfig.getFtpesConfiguration()); + verify(appConfigUnderTest, times(1)).setFilepath(anyString()); + verify(appConfigUnderTest, times(1)).initFileStreamReader(); + Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); + Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); + Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); } @@ -127,16 +126,16 @@ class DatafileAppConfigTest { new ByteArrayInputStream((getJsonConfig(INCORRECT_JSON).getBytes(StandardCharsets.UTF_8))); // When - datafileAppConfig.setFilepath(filePath); - doReturn(inputStream).when(datafileAppConfig).getInputStream(any()); - datafileAppConfig.initFileStreamReader(); + appConfigUnderTest.setFilepath(filePath); + doReturn(inputStream).when(appConfigUnderTest).getInputStream(any()); + appConfigUnderTest.initFileStreamReader(); // Then - verify(datafileAppConfig, times(1)).setFilepath(anyString()); - verify(datafileAppConfig, times(1)).initFileStreamReader(); - Assertions.assertNotNull(datafileAppConfig.getDmaapConsumerConfiguration()); - Assertions.assertNull(datafileAppConfig.getDmaapPublisherConfiguration()); - Assertions.assertNotNull(datafileAppConfig.getFtpesConfiguration()); + verify(appConfigUnderTest, times(1)).setFilepath(anyString()); + verify(appConfigUnderTest, times(1)).initFileStreamReader(); + Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); + Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); + Assertions.assertNotNull(appConfigUnderTest.getFtpesConfiguration()); } @@ -147,22 +146,22 @@ class DatafileAppConfigTest { InputStream inputStream = new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8))); // When - datafileAppConfig.setFilepath(filePath); - doReturn(inputStream).when(datafileAppConfig).getInputStream(any()); + appConfigUnderTest.setFilepath(filePath); + doReturn(inputStream).when(appConfigUnderTest).getInputStream(any()); JsonElement jsonElement = mock(JsonElement.class); when(jsonElement.isJsonObject()).thenReturn(false); - doReturn(jsonElement).when(datafileAppConfig).getJsonElement(any(JsonParser.class), any(InputStream.class)); - datafileAppConfig.initFileStreamReader(); - appConfig.dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration(); - appConfig.dmaapPublisherConfiguration = datafileAppConfig.getDmaapPublisherConfiguration(); - appConfig.ftpesConfig = datafileAppConfig.getFtpesConfiguration(); + doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class)); + appConfigUnderTest.initFileStreamReader(); + appConfigUnderTest.dmaapConsumerConfiguration = appConfigUnderTest.getDmaapConsumerConfiguration(); + appConfigUnderTest.dmaapPublisherConfiguration = appConfigUnderTest.getDmaapPublisherConfiguration(); + appConfigUnderTest.ftpesConfig = appConfigUnderTest.getFtpesConfiguration(); // Then - verify(datafileAppConfig, times(1)).setFilepath(anyString()); - verify(datafileAppConfig, times(1)).initFileStreamReader(); - Assertions.assertNull(datafileAppConfig.getDmaapConsumerConfiguration()); - Assertions.assertNull(datafileAppConfig.getDmaapPublisherConfiguration()); - Assertions.assertNull(datafileAppConfig.getFtpesConfiguration()); + verify(appConfigUnderTest, times(1)).setFilepath(anyString()); + verify(appConfigUnderTest, times(1)).initFileStreamReader(); + Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration()); + Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration()); + Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration()); } private String getJsonConfig(boolean correct) { diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java index 05a4f515..0d5ea003 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * PROJECT * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * Copyright (C) 2018-2019 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ package org.onap.dcaegen2.collectors.datafile.integration; import static org.mockito.Mockito.mock; -import org.onap.dcaegen2.collectors.datafile.configuration.DatafileAppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -34,8 +34,8 @@ import org.springframework.context.annotation.Configuration; class ServiceMockProvider { @Bean - public DatafileAppConfig getDatafileAppConfig() { - return mock(DatafileAppConfig.class); + public AppConfig getDatafileAppConfig() { + return mock(AppConfig.class); } @Bean diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java new file mode 100644 index 00000000..7b38ee42 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class PublishedFileCacheTest { + + private static PublishedFileCache testObject; + + @BeforeAll + public static void setUp() { + testObject = new PublishedFileCache(); + } + + @Test + public void purgeFiles_timeNotExpired() { + Assertions.assertNull(testObject.put(Paths.get("A"))); + Assertions.assertNotNull(testObject.put(Paths.get("A"))); + testObject.put(Paths.get("B")); + + testObject.purge(Instant.now()); + Assertions.assertEquals(2, testObject.size()); + } + + @Test + public void purgeFiles_timeExpired() { + testObject.put(Paths.get("A")); + testObject.put(Paths.get("B")); + testObject.put(Paths.get("C")); + + testObject.purge(Instant.MAX); + Assertions.assertEquals(0, testObject.size()); + } + + @Test + public void purgeFiles_remove() { + Path path = Paths.get("A"); + testObject.put(path); + Assertions.assertEquals(1, testObject.size()); + testObject.remove(path); + Assertions.assertEquals(0, testObject.size()); + } +} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 73511d19..24b82fe6 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -37,7 +37,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; import org.springframework.http.HttpStatus; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; /** @@ -61,42 +61,43 @@ class DataRouterPublisherTest { @BeforeAll public static void setUp() { - //@formatter:off + dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapContentType("application/json") - .dmaapHostName("54.45.33.2") - .dmaapPortNumber(1234) - .dmaapProtocol("https") - .dmaapUserName("DFC") - .dmaapUserPassword("DFC") - .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") - .trustStorePath("trustStorePath") - .trustStorePasswordPath("trustStorePasswordPath") - .keyStorePath("keyStorePath") - .keyStorePasswordPath("keyStorePasswordPath") - .enableDmaapCertAuth(true) - .build(); + .dmaapContentType("application/json") // + .dmaapHostName("54.45.33.2") // + .dmaapPortNumber(1234) // + .dmaapProtocol("https") // + .dmaapUserName("DFC") // + .dmaapUserPassword("DFC") // + .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); // consumerDmaapModel = ImmutableConsumerDmaapModel.builder() - .productName(PRODUCT_NAME) - .vendorName(VENDOR_NAME) - .lastEpochMicrosec(LAST_EPOCH_MICROSEC) - .sourceName(SOURCE_NAME) - .startEpochMicrosec(START_EPOCH_MICROSEC) - .timeZoneOffset(TIME_ZONE_OFFSET) - .name(PM_FILE_NAME) - .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) - .internalLocation("target/" + PM_FILE_NAME) - .compression("gzip") - .fileFormatType("org.3GPP.32.435#measCollec") - .fileFormatVersion("V10") - .build(); + .productName(PRODUCT_NAME) // + .vendorName(VENDOR_NAME) // + .lastEpochMicrosec(LAST_EPOCH_MICROSEC) // + .sourceName(SOURCE_NAME) // + .startEpochMicrosec(START_EPOCH_MICROSEC) // + .timeZoneOffset(TIME_ZONE_OFFSET) // + .name(PM_FILE_NAME) // + .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) // + .internalLocation("target/" + PM_FILE_NAME) // + .compression("gzip") // + .fileFormatType("org.3GPP.32.435#measCollec") // + .fileFormatVersion("V10") // + .build(); // appConfig = mock(AppConfig.class); - //@formatter:on + + doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); } @Test public void whenPassedObjectFits_ReturnsCorrectStatus() { - prepareMocksForTests(Flux.just(HttpStatus.OK)); + prepareMocksForTests(Mono.just(HttpStatus.OK)); StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) .expectNext(consumerDmaapModel).verifyComplete(); @@ -107,7 +108,7 @@ class DataRouterPublisherTest { @Test public void whenPassedObjectFits_firstFailsThenSucceeds() { - prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK)); + prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK)); StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) .expectNext(consumerDmaapModel).verifyComplete(); @@ -118,7 +119,7 @@ class DataRouterPublisherTest { @Test public void whenPassedObjectFits_firstFailsThenFails() { - prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY)); + prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY)); StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) .expectErrorMessage("Retries exhausted: 1/1").verify(); @@ -128,11 +129,11 @@ class DataRouterPublisherTest { } @SafeVarargs - final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) { + final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) { dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse, nextHttpResponses); - when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration); + dmaapPublisherTask = spy(new DataRouterPublisher(appConfig)); when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration); doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient(); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java new file mode 100644 index 00000000..0662216b --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -0,0 +1,285 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2019 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.tasks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; +import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; +import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +public class ScheduledTasksTest { + + private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz"; + + private AppConfig appConfig = mock(AppConfig.class); + private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig)); + + private int uniqueValue = 0; + private DMaaPMessageConsumerTask consumerMock; + private FileCollector fileCollectorMock; + private DataRouterPublisher dataRouterMock; + + @BeforeEach + private void setUp() { + DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() // + .dmaapContentType("application/json") // + .dmaapHostName("54.45.33.2") // + .dmaapPortNumber(1234) // + .dmaapProtocol("https") // + .dmaapUserName("DFC") // + .dmaapUserPassword("DFC") // + .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") // + .trustStorePath("trustStorePath") // + .trustStorePasswordPath("trustStorePasswordPath") // + .keyStorePath("keyStorePath") // + .keyStorePasswordPath("keyStorePasswordPath") // + .enableDmaapCertAuth(true) // + .build(); // + + consumerMock = mock(DMaaPMessageConsumerTask.class); + fileCollectorMock = mock(FileCollector.class); + dataRouterMock = mock(DataRouterPublisher.class); + + doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration(); + doReturn(consumerMock).when(testedObject).createConsumerTask(); + doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull()); + doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher(); + } + + private MessageMetaData messageMetaData() { + return ImmutableMessageMetaData.builder() // + .productName("productName") // + .vendorName("") // + .lastEpochMicrosec("") // + .sourceName("") // + .startEpochMicrosec("") // + .timeZoneOffset("") // + .changeIdentifier("") // + .changeType("") // + .build(); + } + + private FileData fileData(int instanceNumber) { + return ImmutableFileData.builder() // + .name("name" + instanceNumber) // + .fileFormatType("") // + .fileFormatVersion("") // + .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) // + .scheme(Scheme.FTPS) // + .compression("") // + .build(); + } + + private List<FileData> files(int size, boolean uniqueNames) { + List<FileData> list = new LinkedList<FileData>(); + for (int i = 0; i < size; ++i) { + if (uniqueNames) { + ++uniqueValue; + } + list.add(fileData(uniqueValue)); + } + return list; + } + + private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) { + MessageMetaData md = messageMetaData(); + return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md) + .files(files(numberOfFiles, uniqueNames)).build(); + } + + private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) { + List<FileReadyMessage> list = new LinkedList<FileReadyMessage>(); + for (int i = 0; i < numberOfEvents; ++i) { + list.add(createFileReadyMessage(filesPerEvent, uniqueNames)); + } + return Flux.fromIterable(list); + } + + private ConsumerDmaapModel consumerData() { + return ImmutableConsumerDmaapModel // + .builder() // + .productName("") // + .vendorName("") // + .lastEpochMicrosec("") // + .sourceName("") // + .startEpochMicrosec("") // + .timeZoneOffset("") // + .name("") // + .location("") // + .internalLocation("internalLocation") // + .compression("") // + .fileFormatType("") // + .fileFormatVersion("") // + .build(); + } + + @Test + public void notingToConsume() { + doReturn(consumerMock).when(testedObject).createConsumerTask(); + doReturn(Flux.empty()).when(consumerMock).execute(); + + testedObject.scheduleMainDatafileEventTask(); + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_successfulCase() { + final int noOfEvents = 200; + final int noOfFilesPerEvent = 200; + final int noOfFiles = noOfEvents * noOfFilesPerEvent; + + Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true); + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(noOfFiles) // + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_fetchFailedOnce() { + Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); + Mono<Object> error = Mono.error(new Exception("problem")); + + // First file collect will fail, 3 will succeed + doReturn(error, collectedFile, collectedFile, collectedFile) // + .when(fileCollectorMock) // + .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class)); + + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(3) // + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_publishFailedOnce() { + + Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); + + Mono<Object> error = Mono.error(new Exception("problem")); + // One publish will fail, the rest will succeed + doReturn(collectedFile, error, collectedFile, collectedFile) // + .when(dataRouterMock) // + .execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(3) // 3 completed files + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + @Test + public void consume_successfulCase_sameFileNames() { + final int noOfEvents = 1; + final int noOfFilesPerEvent = 100; + + // 100 files with the same name + Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false); + doReturn(fileReadyMessages).when(consumerMock).execute(); + + Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData()); + doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull()); + doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull()); + + StepVerifier.create(testedObject.createMainTask()).expectSubscription() // + .expectNextCount(1) // 99 is skipped + .expectComplete() // + .verify(); // + + assertEquals(0, testedObject.getCurrentNumberOfTasks()); + verify(consumerMock, times(1)).execute(); + verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull()); + verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull()); + verifyNoMoreInteractions(dataRouterMock); + verifyNoMoreInteractions(fileCollectorMock); + verifyNoMoreInteractions(consumerMock); + } + + +} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java index 10c5b167..804b46e9 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java @@ -67,7 +67,12 @@ public class XnfCollectorTaskImplTest { private static final String PWD = "pwd"; private static final String FTPES_LOCATION = FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + + private static final String FTPES_LOCATION_NO_PORT = + FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION; private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION; + private static final String GZIP_COMPRESSION = "gzip"; private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; private static final String FILE_FORMAT_VERSION = "V10"; @@ -100,11 +105,11 @@ public class XnfCollectorTaskImplTest { // @formatter:on } - private FileData createFileData() { + private FileData createFileData(String location) { // @formatter:off return ImmutableFileData.builder() .name(PM_FILE_NAME) - .location(FTPES_LOCATION) + .location(location) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) @@ -113,7 +118,7 @@ public class XnfCollectorTaskImplTest { // @formatter:on } - private ConsumerDmaapModel createExpectedConsumerDmaapModel() { + private ConsumerDmaapModel createExpectedConsumerDmaapModel(String location) { // @formatter:off return ImmutableConsumerDmaapModel.builder() .productName(PRODUCT_NAME) @@ -123,7 +128,7 @@ public class XnfCollectorTaskImplTest { .startEpochMicrosec(START_EPOCH_MICROSEC) .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) - .location(FTPES_LOCATION) + .location(location) .internalLocation(LOCAL_FILE_LOCATION.toString()) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) @@ -146,9 +151,9 @@ public class XnfCollectorTaskImplTest { FileCollector collectorUndetTest = new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - FileData fileData = createFileData(); + FileData fileData = createFileData(FTPES_LOCATION_NO_PORT); - ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) .expectNext(expectedConsumerDmaapModel).verifyComplete(); @@ -168,7 +173,7 @@ public class XnfCollectorTaskImplTest { // @formatter:off FileData fileData = ImmutableFileData.builder() .name(PM_FILE_NAME) - .location(SFTP_LOCATION) + .location(SFTP_LOCATION_NO_PORT) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) @@ -183,7 +188,7 @@ public class XnfCollectorTaskImplTest { .startEpochMicrosec(START_EPOCH_MICROSEC) .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) - .location(SFTP_LOCATION) + .location(SFTP_LOCATION_NO_PORT) .internalLocation(LOCAL_FILE_LOCATION.toString()) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) @@ -202,7 +207,7 @@ public class XnfCollectorTaskImplTest { public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { FileCollector collectorUndetTest = new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - FileData fileData = createFileData(); + FileData fileData = createFileData(FTPES_LOCATION); doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); @@ -219,9 +224,9 @@ public class XnfCollectorTaskImplTest { doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); - FileData fileData = createFileData(); + FileData fileData = createFileData(FTPES_LOCATION_NO_PORT); StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) .expectNext(expectedConsumerDmaapModel).verifyComplete(); |