summaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src')
-rw-r--r--datafile-app-server/src/main/docker/Dockerfile41
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java271
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java39
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java144
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java20
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java59
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java6
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java19
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java3
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java145
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java105
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java8
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java64
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java69
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java285
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java27
17 files changed, 764 insertions, 556 deletions
diff --git a/datafile-app-server/src/main/docker/Dockerfile b/datafile-app-server/src/main/docker/Dockerfile
new file mode 100644
index 00000000..6ab30cc2
--- /dev/null
+++ b/datafile-app-server/src/main/docker/Dockerfile
@@ -0,0 +1,41 @@
+#
+# ============LICENSE_START=======================================================
+# Copyright (C) 2019 Nordix Foundation.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=========================================================
+#
+FROM openjdk:8-jre-alpine
+
+WORKDIR /opt/app/datafile
+RUN mkdir -p /var/log/ONAP
+
+ADD /target/datafile-app-server.jar /opt/app/datafile/
+
+ADD /config/application.yaml /opt/app/datafile/config/
+ADD /config/cacerts /opt/app/datafile/config/
+ADD /config/datafile_endpoints.json /opt/app/datafile/config/
+ADD /config/ftpKey.jks /opt/app/datafile/config/
+ADD /config/keystore /opt/app/datafile/config/
+
+EXPOSE 8100 8433
+
+RUN addgroup -S onap && adduser -S datafile -G onap
+RUN chown -R datafile:onap /opt/app/datafile
+RUN chown -R datafile:onap /var/log/ONAP
+
+USER datafile
+
+ENTRYPOINT ["/usr/bin/java", "-jar", "/opt/app/datafile/datafile-app-server.jar"] \ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 5bbacb14..40de33dd 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -20,14 +20,21 @@ package org.onap.dcaegen2.collectors.datafile.configuration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Configuration;
+import java.io.*;
+import java.util.ServiceLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
-
-import java.util.Optional;
-import java.util.function.Predicate;
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -35,199 +42,95 @@ import java.util.function.Predicate;
*/
@Component
-@Configuration
-public class AppConfig extends DatafileAppConfig {
-
- private static Predicate<String> isEmpty = String::isEmpty;
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapHostName:}")
- public String consumerDmaapHostName;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapPortNumber:}")
- public Integer consumerDmaapPortNumber;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapTopicName:}")
- public String consumerDmaapTopicName;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapProtocol:}")
- public String consumerDmaapProtocol;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapUserName:}")
- public String consumerDmaapUserName;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapUserPassword:}")
- public String consumerDmaapUserPassword;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapContentType:}")
- public String consumerDmaapContentType;
-
- @Value("${dmaap.dmaapConsumerConfiguration.consumerId:}")
- public String consumerId;
-
- @Value("${dmaap.dmaapConsumerConfiguration.consumerGroup:}")
- public String consumerGroup;
-
- @Value("${dmaap.dmaapConsumerConfiguration.timeoutMs:}")
- public Integer consumerTimeoutMs;
-
- @Value("${dmaap.dmaapConsumerConfiguration.message-limit:}")
- public Integer consumerMessageLimit;
-
- @Value("${dmaap.dmaapProducerConfiguration.dmaapHostName:}")
- public String producerDmaapHostName;
-
- @Value("${dmaap.dmaapProducerConfiguration.dmaapPortNumber:}")
- public Integer producerDmaapPortNumber;
-
- @Value("${dmaap.dmaapProducerConfiguration.dmaapTopicName:}")
- public String producerDmaapTopicName;
+@EnableConfigurationProperties
+@ConfigurationProperties("app")
+public class AppConfig {
- @Value("${dmaap.dmaapProducerConfiguration.dmaapProtocol:}")
- public String producerDmaapProtocol;
+ private static final String CONFIG = "configs";
+ private static final String DMAAP = "dmaap";
+ private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
+ private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
+ private static final String FTP = "ftp";
+ private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
+ private static final String SECURITY = "security";
+ private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- @Value("${dmaap.dmaapProducerConfiguration.dmaapUserName:}")
- public String producerDmaapUserName;
+ DmaapConsumerConfiguration dmaapConsumerConfiguration;
- @Value("${dmaap.dmaapProducerConfiguration.dmaapUserPassword:}")
- public String producerDmaapUserPassword;
+ DmaapPublisherConfiguration dmaapPublisherConfiguration;
- @Value("${dmaap.dmaapProducerConfiguration.dmaapContentType:}")
- public String producerDmaapContentType;
+ FtpesConfig ftpesConfig;
- @Value("${ftp.ftpesConfiguration.keyCert:}")
- public String keyCert;
+ @NotEmpty
+ private String filepath;
- @Value("${ftp.ftpesConfiguration.keyPassword:}")
- public String keyPassword;
-
- @Value("${ftp.ftpesConfiguration.trustedCA:}")
- public String trustedCA;
+ public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ return dmaapConsumerConfiguration;
+ }
- @Value("${ftp.ftpesConfiguration.trustedCAPassword:}")
- public String trustedCAPassword;
+ public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ return dmaapPublisherConfiguration;
+ }
- @Value("${security.trustStorePath:}")
- public String trustStorePath;
+ public FtpesConfig getFtpesConfiguration() {
+ return ftpesConfig;
+ }
- @Value("${security.trustStorePasswordPath:}")
- public String trustStorePasswordPath;
+ public void initFileStreamReader() {
+
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+ JsonParser parser = new JsonParser();
+ JsonObject jsonObject;
+ try (InputStream inputStream = getInputStream(filepath)) {
+ JsonElement rootElement = getJsonElement(parser, inputStream);
+ if (rootElement.isJsonObject()) {
+ jsonObject = rootElement.getAsJsonObject();
+ ftpesConfig = deserializeType(gsonBuilder,
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
+ FtpesConfig.class);
+ dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER),
+ rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
+ DmaapConsumerConfiguration.class);
+
+ dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER),
+ rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
+ DmaapPublisherConfiguration.class);
+ }
+ } catch (IOException e) {
+ logger.error("Problem with file loading, file: {}", filepath, e);
+ } catch (JsonSyntaxException e) {
+ logger.error("Problem with Json deserialization", e);
+ }
+ }
- @Value("${security.keyStorePath:}")
- public String keyStorePath;
+ JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
+ return parser.parse(new InputStreamReader(inputStream));
+ }
- @Value("${security.keyStorePasswordPath:}")
- public String keyStorePasswordPath;
+ private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
+ @NotNull Class<T> type) {
+ return gsonBuilder.create().fromJson(jsonObject, type);
+ }
- @Value("${security.enableDmaapCertAuth:}")
- public Boolean enableDmaapCertAuth;
+ InputStream getInputStream(@NotNull String filepath) throws IOException {
+ return new BufferedInputStream(new FileInputStream(filepath));
+ }
- @Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .dmaapUserPassword(
- Optional.ofNullable(consumerDmaapUserPassword).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapUserPassword()))
- .dmaapUserName(
- Optional.ofNullable(consumerDmaapUserName).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapUserName()))
- .dmaapHostName(
- Optional.ofNullable(consumerDmaapHostName).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapHostName()))
- .dmaapPortNumber(
- Optional.ofNullable(consumerDmaapPortNumber).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapConsumerConfiguration.dmaapPortNumber()))
- .dmaapProtocol(
- Optional.ofNullable(consumerDmaapProtocol).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapProtocol()))
- .dmaapContentType(
- Optional.ofNullable(consumerDmaapContentType).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapContentType()))
- .dmaapTopicName(
- Optional.ofNullable(consumerDmaapTopicName).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapTopicName()))
- .messageLimit(
- Optional.ofNullable(consumerMessageLimit).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapConsumerConfiguration.messageLimit()))
- .timeoutMs(Optional.ofNullable(consumerTimeoutMs).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapConsumerConfiguration.timeoutMs()))
- .consumerGroup(Optional.ofNullable(consumerGroup).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.consumerGroup()))
- .consumerId(Optional.ofNullable(consumerId).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.consumerId()))
- .trustStorePath(
- Optional.ofNullable(trustStorePath).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.trustStorePath()))
- .trustStorePasswordPath(
- Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.trustStorePasswordPath()))
- .keyStorePath(
- Optional.ofNullable(keyStorePath).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.keyStorePath()))
- .keyStorePasswordPath(
- Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.keyStorePasswordPath()))
- .enableDmaapCertAuth(
- Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapConsumerConfiguration.enableDmaapCertAuth()))
- .build();
+ String getFilepath() {
+ return this.filepath;
}
- @Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType(
- Optional.ofNullable(producerDmaapContentType).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapContentType()))
- .dmaapHostName(
- Optional.ofNullable(producerDmaapHostName).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapHostName()))
- .dmaapPortNumber(
- Optional.ofNullable(producerDmaapPortNumber).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapPublisherConfiguration.dmaapPortNumber()))
- .dmaapProtocol(
- Optional.ofNullable(producerDmaapProtocol).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapProtocol()))
- .dmaapTopicName(
- Optional.ofNullable(producerDmaapTopicName).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapTopicName()))
- .dmaapUserName(
- Optional.ofNullable(producerDmaapUserName).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapUserName()))
- .dmaapUserPassword(
- Optional.ofNullable(producerDmaapUserPassword).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapUserPassword()))
- .trustStorePath(
- Optional.ofNullable(trustStorePath).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.trustStorePath()))
- .trustStorePasswordPath(
- Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.trustStorePasswordPath()))
- .keyStorePath(
- Optional.ofNullable(keyStorePath).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.keyStorePath()))
- .keyStorePasswordPath(
- Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.keyStorePasswordPath()))
- .enableDmaapCertAuth(
- Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapPublisherConfiguration.enableDmaapCertAuth()))
- .build();
+ public void setFilepath(String filepath) {
+ this.filepath = filepath;
}
- @Override
- public FtpesConfig getFtpesConfiguration() {
- return new ImmutableFtpesConfig.Builder()
- .keyCert(
- Optional.ofNullable(keyCert).filter(isEmpty.negate())
- .orElse(ftpesConfig.keyCert()))
- .keyPassword(
- Optional.ofNullable(keyPassword).filter(isEmpty.negate())
- .orElse(ftpesConfig.keyPassword()))
- .trustedCA(
- Optional.ofNullable(trustedCA).filter(isEmpty.negate())
- .orElse(ftpesConfig.trustedCA()))
- .trustedCAPassword(
- Optional.ofNullable(trustedCAPassword).filter(isEmpty.negate())
- .orElse(ftpesConfig.trustedCAPassword()))
- .build();
+ private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
+ source.entrySet()
+ .forEach(entry -> target.add(entry.getKey(), entry.getValue()));
+ return target;
}
+
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java
deleted file mode 100644
index 7fe2561c..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/Config.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public interface Config {
-
- DmaapConsumerConfiguration getDmaapConsumerConfiguration();
-
- DmaapPublisherConfiguration getDmaapPublisherConfiguration();
-
- FtpesConfig getFtpesConfiguration();
-
- void initFileStreamReader();
-
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
deleted file mode 100644
index 59bb259d..00000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ServiceLoader;
-
-import javax.validation.constraints.NotEmpty;
-import javax.validation.constraints.NotNull;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Configuration
-@EnableConfigurationProperties
-@ConfigurationProperties("app")
-public abstract class DatafileAppConfig implements Config {
-
- private static final String CONFIG = "configs";
- private static final String DMAAP = "dmaap";
- private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
- private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private static final String FTP = "ftp";
- private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
- private static final String SECURITY = "security";
- private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class);
-
- DmaapConsumerConfiguration dmaapConsumerConfiguration;
-
- DmaapPublisherConfiguration dmaapPublisherConfiguration;
-
- FtpesConfig ftpesConfig;
-
- @NotEmpty
- private String filepath;
-
-
- @Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return dmaapConsumerConfiguration;
- }
-
- @Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherConfiguration;
- }
-
- @Override
- public FtpesConfig getFtpesConfiguration() {
- return ftpesConfig;
- }
-
- @Override
- public void initFileStreamReader() {
-
- GsonBuilder gsonBuilder = new GsonBuilder();
- ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- JsonParser parser = new JsonParser();
- JsonObject jsonObject;
- try (InputStream inputStream = getInputStream(filepath)) {
- JsonElement rootElement = getJsonElement(parser, inputStream);
- if (rootElement.isJsonObject()) {
- jsonObject = rootElement.getAsJsonObject();
- ftpesConfig = deserializeType(gsonBuilder,
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
- FtpesConfig.class);
- dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapConsumerConfiguration.class);
-
- dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapPublisherConfiguration.class);
- }
- } catch (IOException e) {
- logger.error("Problem with file loading, file: {}", filepath, e);
- } catch (JsonSyntaxException e) {
- logger.error("Problem with Json deserialization", e);
- }
- }
-
- JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
- return parser.parse(new InputStreamReader(inputStream));
- }
-
- private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
- return gsonBuilder.create().fromJson(jsonObject, type);
- }
-
- InputStream getInputStream(@NotNull String filepath) throws IOException {
- return new BufferedInputStream(new FileInputStream(filepath));
- }
-
- String getFilepath() {
- return this.filepath;
- }
-
- public void setFilepath(String filepath) {
- this.filepath = filepath;
- }
-
- private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet()
- .forEach(entry -> target.add(entry.getKey(), entry.getValue()));
- return target;
- }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 478ae309..bc21f96c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -21,7 +21,9 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
+
import javax.annotation.PostConstruct;
+
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
@@ -29,6 +31,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
+
import io.swagger.annotations.ApiOperation;
import reactor.core.publisher.Mono;
@@ -37,10 +40,11 @@ import reactor.core.publisher.Mono;
*/
@Configuration
@EnableScheduling
-public class SchedulerConfig extends DatafileAppConfig {
+public class SchedulerConfig {
- private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15;
- private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
+ private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = Duration.ofSeconds(15);
+ private static final Duration SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = Duration.ofMinutes(5);
+ private static final Duration SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE = Duration.ofHours(1);
private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
private final TaskScheduler taskScheduler;
@@ -77,11 +81,13 @@ public class SchedulerConfig extends DatafileAppConfig {
@ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
if (scheduledFutureList.isEmpty()) {
- scheduledFutureList.add(taskScheduler
- .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
- Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
+ scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
+ SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask,
- Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS)));
+ SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+ scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
+ SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
+
return true;
} else {
return false;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 3c606deb..a8f79ea1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -123,12 +123,11 @@ public class JsonMessageParser {
}
private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
- return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
- : transformMessages(monoJsonP));
+ return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
+ : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
- private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+ private Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
@@ -138,22 +137,22 @@ public class JsonMessageParser {
if (!allFileDataFromJson.isEmpty()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
// @formatter:off
- return Flux.just(ImmutableFileReadyMessage.builder()
+ return Mono.just(ImmutableFileReadyMessage.builder()
.pnfName(messageMetaData.sourceName())
.messageMetaData(messageMetaData)
.files(allFileDataFromJson)
.build());
// @formatter:on
} else {
- return Flux.empty();
+ return Mono.empty();
}
}
logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
- return Flux.empty();
+ return Mono.empty();
}
logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
- return Flux.empty();
+ return Mono.empty();
}
private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
new file mode 100644
index 00000000..2cb84112
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A cache of all files that already has been published. Key is the local file path and the value is
+ * a time stamp, when the key was last used.
+ */
+public class PublishedFileCache {
+ private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
+
+ public Instant put(Path path) {
+ return publishedFiles.put(path, Instant.now());
+ }
+
+ public void remove(Path localFileName) {
+ publishedFiles.remove(localFileName);
+ }
+
+ public void purge(Instant now) {
+ for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<Path, Instant> pair = it.next();
+ if (isCachedPublishedFileOutdated(now, pair.getValue())) {
+ it.remove();
+ }
+ }
+ }
+
+ public int size() {
+ return publishedFiles.size();
+ }
+
+ private boolean isCachedPublishedFileOutdated(Instant now, Instant then) {
+ final int timeToKeepInfoInSeconds = 60 * 60 * 24;
+ return now.getEpochSecond() - then.getEpochSecond() > timeToKeepInfoInSeconds;
+ }
+
+
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
index c41dce5b..f6daf733 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -22,7 +22,6 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
@@ -31,9 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consume
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.reactive.function.client.WebClient;
-
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -41,7 +39,7 @@ import reactor.core.publisher.Mono;
public class DMaaPMessageConsumerTask {
private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
- private Config datafileAppConfig;
+ private AppConfig datafileAppConfig;
private JsonMessageParser jsonMessageParser;
private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index b65ddd63..4c0dcce5 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -19,7 +19,6 @@ package org.onap.dcaegen2.collectors.datafile.tasks;
import java.time.Duration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
@@ -28,7 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -37,7 +36,7 @@ import reactor.core.publisher.Flux;
public class DataRouterPublisher {
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
- private final Config datafileAppConfig;
+ private final AppConfig datafileAppConfig;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -51,27 +50,27 @@ public class DataRouterPublisher {
* @param firstBackoffTimeout the time to delay the first retry
* @return the HTTP response status as a string
*/
- public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
//@formatter:off
- return Flux.just(model)
- .cache(1)
+ return Mono.just(model)
+ .cache()
.flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
.flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
.retryBackoff(numRetries, firstBackoff);
//@formatter:on
}
- private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
- return Flux.just(model);
+ return Mono.just(model);
} else {
- logger.warn("Publish to DR unsuccessful, response code: " + response);
- return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+ logger.warn("Publish to DR unsuccessful, response code: {}", response);
+ return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response));
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index db18ac2a..0b647bf5 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -20,7 +20,6 @@ import java.nio.file.Path;
import java.time.Duration;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
@@ -41,7 +40,7 @@ import reactor.core.publisher.Mono;
public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
- private Config datafileAppConfig;
+ private AppConfig datafileAppConfig;
private final FtpsClient ftpsClient;
private final SftpClient sftpClient;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index f22c7bf9..783c699c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -20,11 +20,9 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -34,6 +32,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.service.PublishedFileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,22 +40,23 @@ import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ * This implements the main flow of the data file collector. Fetch file ready events from the
+ * message router, fetch new files from the PNF publish these in the data router.
*/
@Component
public class ScheduledTasks {
private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
+ private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
- /** Data needed for fetching of files from one PNF */
+ /** Data needed for fetching of one file */
private class FileCollectionData {
final FileData fileData;
- final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES
- // event
+ final FileCollector collectorTask;
final MessageMetaData metaData;
FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
@@ -68,15 +68,15 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private final AppConfig applicationConfiguration;
- private final AtomicInteger taskCounter = new AtomicInteger();
- private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>());
+ private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
+ private final Scheduler scheduler =
+ Schedulers.newElastic("DataFileCollector", MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS);
+ PublishedFileCache alreadyPublishedFiles = new PublishedFileCache();
/**
* Constructor for task registration in Datafile Workflow.
*
* @param applicationConfiguration - application configuration
- * @param xnfCollectorTask - second task
- * @param dmaapPublisherTask - third task
*/
@Autowired
public ScheduledTasks(AppConfig applicationConfiguration) {
@@ -84,52 +84,67 @@ public class ScheduledTasks {
}
/**
- * Main function for scheduling Datafile Workflow.
+ * Main function for scheduling for the file collection Workflow.
*/
public void scheduleMainDatafileEventTask() {
logger.trace("Execution of tasks was registered");
applicationConfiguration.initFileStreamReader();
- //@formatter:off
- consumeMessagesFromDmaap()
- .parallel() // Each FileReadyMessage in a separate thread
- .runOn(Schedulers.parallel())
- .flatMap(this::createFileCollectionTask)
- .filter(this::shouldBePublished)
- .doOnNext(fileData -> taskCounter.incrementAndGet())
- .flatMap(this::collectFileFromXnf)
- .flatMap(this::publishToDataRouter)
- .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
- .doOnNext(model -> taskCounter.decrementAndGet())
- .sequential()
- .subscribe(this::onSuccess, this::onError, this::onComplete);
- //@formatter:on
+ createMainTask().subscribe(this::onSuccess, this::onError, this::onComplete);
+ }
+
+ Flux<ConsumerDmaapModel> createMainTask() {
+ return fetchMoreFileReadyMessages() //
+ .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
+ .runOn(scheduler) //
+ .flatMap(this::createFileCollectionTask) //
+ .filter(this::shouldBePublished) //
+ .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
+ .flatMap(this::collectFileFromXnf) //
+ .flatMap(this::publishToDataRouter) //
+ .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) //
+ .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
+ .sequential();
+ }
+
+ /**
+ * called in regular intervals to remove out-dated cached information
+ */
+ public void purgeCachedInformation(Instant now) {
+ alreadyPublishedFiles.purge(now);
}
private void onComplete() {
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(Path localFile) {
- logger.info("Datafile consumed tasks." + localFile);
+ private void onSuccess(ConsumerDmaapModel model) {
+ logger.info("Datafile consumed tasks {}", model.getInternalLocation());
}
private void onError(Throwable throwable) {
logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
}
+ private int getParallelism() {
+ if (MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks() > 0) {
+ return MAX_NUMBER_OF_CONCURRENT_TASKS - getCurrentNumberOfTasks();
+ } else {
+ return 1; // We need at least one rail/thread
+ }
+ }
+
private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
List<FileCollectionData> fileCollects = new ArrayList<>();
for (FileData fileData : availableFiles.files()) {
- FileCollector task = new FileCollector(applicationConfiguration,
- new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData()));
- fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
+ fileCollects.add(
+ new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData()));
}
return Flux.fromIterable(fileCollects);
}
private boolean shouldBePublished(FileCollectionData task) {
- return alreadyPublishedFiles.add(task.fileData.getLocalFileName());
+ return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
}
private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
@@ -138,48 +153,49 @@ public class ScheduledTasks {
return fileCollect.collectorTask
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
- logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
- deleteFile(fileData.getLocalFileName());
- alreadyPublishedFiles.remove(fileData.getLocalFileName());
- taskCounter.decrementAndGet();
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) {
+ Path localFileName = fileData.getLocalFileName();
+ logger.error("File fetching failed: {}", localFileName);
+ deleteFile(localFileName);
+ alreadyPublishedFiles.remove(localFileName);
+ currentNumberOfTasks.decrementAndGet();
return Mono.empty();
}
- private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
- DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+ DataRouterPublisher publisherTask = createDataRouterPublisher();
return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
.onErrorResume(exception -> handlePublishFailure(model, exception));
-
}
- private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
Path internalFileName = Paths.get(model.getInternalLocation());
deleteFile(internalFileName);
alreadyPublishedFiles.remove(internalFileName);
- taskCounter.decrementAndGet();
- return Flux.empty();
+ currentNumberOfTasks.decrementAndGet();
+ return Mono.empty();
}
- private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
- final int currentNumberOfTasks = taskCounter.get();
- logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
- if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+ /**
+ * Fetch more messages from the message router. This is done in a polling/blocking fashion.
+ */
+ private Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
+ logger.trace("Consuming new file ready messages, current number of tasks: {}", getCurrentNumberOfTasks());
+ if (getCurrentNumberOfTasks() > MAX_NUMBER_OF_CONCURRENT_TASKS) {
return Flux.empty();
}
- final DMaaPMessageConsumerTask messageConsumerTask =
- new DMaaPMessageConsumerTask(this.applicationConfiguration);
- return messageConsumerTask.execute()
- .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+ return createConsumerTask() //
+ .execute() //
+ .onErrorResume(this::handleConsumeMessageFailure);
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
@@ -187,13 +203,30 @@ public class ScheduledTasks {
return Flux.empty();
}
- private Flux<Path> deleteFile(Path localFile) {
+ private void deleteFile(Path localFile) {
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);
} catch (Exception e) {
- logger.warn("Could not delete file: {}, {}", localFile, e);
+ logger.trace("Could not delete file: {}", localFile);
}
- return Flux.just(localFile);
}
+
+ int getCurrentNumberOfTasks() {
+ return currentNumberOfTasks.get();
+ }
+
+ DMaaPMessageConsumerTask createConsumerTask() {
+ return new DMaaPMessageConsumerTask(this.applicationConfiguration);
+ }
+
+ FileCollector createFileCollector(FileData fileData) {
+ return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
+ new SftpClient(fileData.fileServerData()));
+ }
+
+ DataRouterPublisher createDataRouterPublisher() {
+ return new DataRouterPublisher(applicationConfiguration);
+ }
+
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
index 2cd854af..443ddae7 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
@@ -46,33 +46,32 @@ import org.onap.dcaegen2.collectors.datafile.integration.junit5.mockito.MockitoE
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
@ExtendWith({MockitoExtension.class})
-class DatafileAppConfigTest {
-
+class AppConfigTest {
+
private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json";
private static final boolean CORRECT_JSON = true;
private static final boolean INCORRECT_JSON = false;
- private static DatafileAppConfig datafileAppConfig;
- private static AppConfig appConfig;
+ private static AppConfig appConfigUnderTest;
+
private static String filePath = Objects
- .requireNonNull(DatafileAppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile();
+ .requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile();
@BeforeEach
public void setUp() {
- datafileAppConfig = spy(DatafileAppConfig.class);
- appConfig = spy(new AppConfig());
+ appConfigUnderTest = spy(AppConfig.class);
}
@Test
public void whenApplicationWasStarted_FilePathIsSet() {
// When
- datafileAppConfig.setFilepath(filePath);
+ appConfigUnderTest.setFilepath(filePath);
// Then
- verify(datafileAppConfig, times(1)).setFilepath(anyString());
- verify(datafileAppConfig, times(0)).initFileStreamReader();
- Assertions.assertEquals(filePath, datafileAppConfig.getFilepath());
+ verify(appConfigUnderTest, times(1)).setFilepath(anyString());
+ verify(appConfigUnderTest, times(0)).initFileStreamReader();
+ Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath());
}
@Test
@@ -82,23 +81,23 @@ class DatafileAppConfigTest {
new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
// When
- datafileAppConfig.setFilepath(filePath);
- doReturn(inputStream).when(datafileAppConfig).getInputStream(any());
- datafileAppConfig.initFileStreamReader();
- appConfig.dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration();
- appConfig.dmaapPublisherConfiguration = datafileAppConfig.getDmaapPublisherConfiguration();
- appConfig.ftpesConfig = datafileAppConfig.getFtpesConfiguration();
+ appConfigUnderTest.setFilepath(filePath);
+ doReturn(inputStream).when(appConfigUnderTest).getInputStream(any());
+ appConfigUnderTest.initFileStreamReader();
+ appConfigUnderTest.dmaapConsumerConfiguration = appConfigUnderTest.getDmaapConsumerConfiguration();
+ appConfigUnderTest.dmaapPublisherConfiguration = appConfigUnderTest.getDmaapPublisherConfiguration();
+ appConfigUnderTest.ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
// Then
- verify(datafileAppConfig, times(1)).setFilepath(anyString());
- verify(datafileAppConfig, times(1)).initFileStreamReader();
- Assertions.assertNotNull(datafileAppConfig.getDmaapConsumerConfiguration());
- Assertions.assertNotNull(datafileAppConfig.getDmaapPublisherConfiguration());
- Assertions.assertEquals(appConfig.getDmaapPublisherConfiguration(),
- datafileAppConfig.getDmaapPublisherConfiguration());
- Assertions.assertEquals(appConfig.getDmaapConsumerConfiguration(),
- datafileAppConfig.getDmaapConsumerConfiguration());
- Assertions.assertEquals(appConfig.getFtpesConfiguration(), datafileAppConfig.getFtpesConfiguration());
+ verify(appConfigUnderTest, times(1)).setFilepath(anyString());
+ verify(appConfigUnderTest, times(1)).initFileStreamReader();
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(),
+ appConfigUnderTest.getDmaapPublisherConfiguration());
+ Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(),
+ appConfigUnderTest.getDmaapConsumerConfiguration());
+ Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration());
}
@@ -106,17 +105,17 @@ class DatafileAppConfigTest {
public void whenFileIsNotExist_ThrowIoException() {
// Given
filePath = "/temp.json";
- datafileAppConfig.setFilepath(filePath);
+ appConfigUnderTest.setFilepath(filePath);
// When
- datafileAppConfig.initFileStreamReader();
+ appConfigUnderTest.initFileStreamReader();
// Then
- verify(datafileAppConfig, times(1)).setFilepath(anyString());
- verify(datafileAppConfig, times(1)).initFileStreamReader();
- Assertions.assertNull(datafileAppConfig.getDmaapConsumerConfiguration());
- Assertions.assertNull(datafileAppConfig.getDmaapPublisherConfiguration());
- Assertions.assertNull(datafileAppConfig.getFtpesConfiguration());
+ verify(appConfigUnderTest, times(1)).setFilepath(anyString());
+ verify(appConfigUnderTest, times(1)).initFileStreamReader();
+ Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
@@ -127,16 +126,16 @@ class DatafileAppConfigTest {
new ByteArrayInputStream((getJsonConfig(INCORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
// When
- datafileAppConfig.setFilepath(filePath);
- doReturn(inputStream).when(datafileAppConfig).getInputStream(any());
- datafileAppConfig.initFileStreamReader();
+ appConfigUnderTest.setFilepath(filePath);
+ doReturn(inputStream).when(appConfigUnderTest).getInputStream(any());
+ appConfigUnderTest.initFileStreamReader();
// Then
- verify(datafileAppConfig, times(1)).setFilepath(anyString());
- verify(datafileAppConfig, times(1)).initFileStreamReader();
- Assertions.assertNotNull(datafileAppConfig.getDmaapConsumerConfiguration());
- Assertions.assertNull(datafileAppConfig.getDmaapPublisherConfiguration());
- Assertions.assertNotNull(datafileAppConfig.getFtpesConfiguration());
+ verify(appConfigUnderTest, times(1)).setFilepath(anyString());
+ verify(appConfigUnderTest, times(1)).initFileStreamReader();
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ Assertions.assertNotNull(appConfigUnderTest.getFtpesConfiguration());
}
@@ -147,22 +146,22 @@ class DatafileAppConfigTest {
InputStream inputStream =
new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
// When
- datafileAppConfig.setFilepath(filePath);
- doReturn(inputStream).when(datafileAppConfig).getInputStream(any());
+ appConfigUnderTest.setFilepath(filePath);
+ doReturn(inputStream).when(appConfigUnderTest).getInputStream(any());
JsonElement jsonElement = mock(JsonElement.class);
when(jsonElement.isJsonObject()).thenReturn(false);
- doReturn(jsonElement).when(datafileAppConfig).getJsonElement(any(JsonParser.class), any(InputStream.class));
- datafileAppConfig.initFileStreamReader();
- appConfig.dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration();
- appConfig.dmaapPublisherConfiguration = datafileAppConfig.getDmaapPublisherConfiguration();
- appConfig.ftpesConfig = datafileAppConfig.getFtpesConfiguration();
+ doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class));
+ appConfigUnderTest.initFileStreamReader();
+ appConfigUnderTest.dmaapConsumerConfiguration = appConfigUnderTest.getDmaapConsumerConfiguration();
+ appConfigUnderTest.dmaapPublisherConfiguration = appConfigUnderTest.getDmaapPublisherConfiguration();
+ appConfigUnderTest.ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
// Then
- verify(datafileAppConfig, times(1)).setFilepath(anyString());
- verify(datafileAppConfig, times(1)).initFileStreamReader();
- Assertions.assertNull(datafileAppConfig.getDmaapConsumerConfiguration());
- Assertions.assertNull(datafileAppConfig.getDmaapPublisherConfiguration());
- Assertions.assertNull(datafileAppConfig.getFtpesConfiguration());
+ verify(appConfigUnderTest, times(1)).setFilepath(anyString());
+ verify(appConfigUnderTest, times(1)).initFileStreamReader();
+ Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
private String getJsonConfig(boolean correct) {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java
index 05a4f515..0d5ea003 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ServiceMockProvider.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* PROJECT
* ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +22,7 @@ package org.onap.dcaegen2.collectors.datafile.integration;
import static org.mockito.Mockito.mock;
-import org.onap.dcaegen2.collectors.datafile.configuration.DatafileAppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -34,8 +34,8 @@ import org.springframework.context.annotation.Configuration;
class ServiceMockProvider {
@Bean
- public DatafileAppConfig getDatafileAppConfig() {
- return mock(DatafileAppConfig.class);
+ public AppConfig getDatafileAppConfig() {
+ return mock(AppConfig.class);
}
@Bean
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java
new file mode 100644
index 00000000..7b38ee42
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCacheTest.java
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.service;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Instant;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class PublishedFileCacheTest {
+
+ private static PublishedFileCache testObject;
+
+ @BeforeAll
+ public static void setUp() {
+ testObject = new PublishedFileCache();
+ }
+
+ @Test
+ public void purgeFiles_timeNotExpired() {
+ Assertions.assertNull(testObject.put(Paths.get("A")));
+ Assertions.assertNotNull(testObject.put(Paths.get("A")));
+ testObject.put(Paths.get("B"));
+
+ testObject.purge(Instant.now());
+ Assertions.assertEquals(2, testObject.size());
+ }
+
+ @Test
+ public void purgeFiles_timeExpired() {
+ testObject.put(Paths.get("A"));
+ testObject.put(Paths.get("B"));
+ testObject.put(Paths.get("C"));
+
+ testObject.purge(Instant.MAX);
+ Assertions.assertEquals(0, testObject.size());
+ }
+
+ @Test
+ public void purgeFiles_remove() {
+ Path path = Paths.get("A");
+ testObject.put(path);
+ Assertions.assertEquals(1, testObject.size());
+ testObject.remove(path);
+ Assertions.assertEquals(0, testObject.size());
+ }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 73511d19..24b82fe6 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -37,7 +37,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
@@ -61,42 +61,43 @@ class DataRouterPublisherTest {
@BeforeAll
public static void setUp() {
- //@formatter:off
+
dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType("application/json")
- .dmaapHostName("54.45.33.2")
- .dmaapPortNumber(1234)
- .dmaapProtocol("https")
- .dmaapUserName("DFC")
- .dmaapUserPassword("DFC")
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT")
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
- .build();
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234) //
+ .dmaapProtocol("https") //
+ .dmaapUserName("DFC") //
+ .dmaapUserPassword("DFC") //
+ .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build(); //
consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME)
- .internalLocation("target/" + PM_FILE_NAME)
- .compression("gzip")
- .fileFormatType("org.3GPP.32.435#measCollec")
- .fileFormatVersion("V10")
- .build();
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(PM_FILE_NAME) //
+ .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) //
+ .internalLocation("target/" + PM_FILE_NAME) //
+ .compression("gzip") //
+ .fileFormatType("org.3GPP.32.435#measCollec") //
+ .fileFormatVersion("V10") //
+ .build(); //
appConfig = mock(AppConfig.class);
- //@formatter:on
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
}
@Test
public void whenPassedObjectFits_ReturnsCorrectStatus() {
- prepareMocksForTests(Flux.just(HttpStatus.OK));
+ prepareMocksForTests(Mono.just(HttpStatus.OK));
StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
.expectNext(consumerDmaapModel).verifyComplete();
@@ -107,7 +108,7 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_firstFailsThenSucceeds() {
- prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK));
+ prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
.expectNext(consumerDmaapModel).verifyComplete();
@@ -118,7 +119,7 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_firstFailsThenFails() {
- prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY));
+ prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
.expectErrorMessage("Retries exhausted: 1/1").verify();
@@ -128,11 +129,11 @@ class DataRouterPublisherTest {
}
@SafeVarargs
- final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) {
+ final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
nextHttpResponses);
- when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
+
dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
new file mode 100644
index 00000000..0662216b
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -0,0 +1,285 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+public class ScheduledTasksTest {
+
+ private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+
+ private AppConfig appConfig = mock(AppConfig.class);
+ private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
+
+ private int uniqueValue = 0;
+ private DMaaPMessageConsumerTask consumerMock;
+ private FileCollector fileCollectorMock;
+ private DataRouterPublisher dataRouterMock;
+
+ @BeforeEach
+ private void setUp() {
+ DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234) //
+ .dmaapProtocol("https") //
+ .dmaapUserName("DFC") //
+ .dmaapUserPassword("DFC") //
+ .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build(); //
+
+ consumerMock = mock(DMaaPMessageConsumerTask.class);
+ fileCollectorMock = mock(FileCollector.class);
+ dataRouterMock = mock(DataRouterPublisher.class);
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(fileCollectorMock).when(testedObject).createFileCollector(notNull());
+ doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
+ }
+
+ private MessageMetaData messageMetaData() {
+ return ImmutableMessageMetaData.builder() //
+ .productName("productName") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .changeIdentifier("") //
+ .changeType("") //
+ .build();
+ }
+
+ private FileData fileData(int instanceNumber) {
+ return ImmutableFileData.builder() //
+ .name("name" + instanceNumber) //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .location("ftpes://192.168.0.101/ftp/rop/" + PM_FILE_NAME + instanceNumber) //
+ .scheme(Scheme.FTPS) //
+ .compression("") //
+ .build();
+ }
+
+ private List<FileData> files(int size, boolean uniqueNames) {
+ List<FileData> list = new LinkedList<FileData>();
+ for (int i = 0; i < size; ++i) {
+ if (uniqueNames) {
+ ++uniqueValue;
+ }
+ list.add(fileData(uniqueValue));
+ }
+ return list;
+ }
+
+ private FileReadyMessage createFileReadyMessage(int numberOfFiles, boolean uniqueNames) {
+ MessageMetaData md = messageMetaData();
+ return ImmutableFileReadyMessage.builder().pnfName(md.sourceName()).messageMetaData(md)
+ .files(files(numberOfFiles, uniqueNames)).build();
+ }
+
+ private Flux<FileReadyMessage> fileReadyMessageFlux(int numberOfEvents, int filesPerEvent, boolean uniqueNames) {
+ List<FileReadyMessage> list = new LinkedList<FileReadyMessage>();
+ for (int i = 0; i < numberOfEvents; ++i) {
+ list.add(createFileReadyMessage(filesPerEvent, uniqueNames));
+ }
+ return Flux.fromIterable(list);
+ }
+
+ private ConsumerDmaapModel consumerData() {
+ return ImmutableConsumerDmaapModel //
+ .builder() //
+ .productName("") //
+ .vendorName("") //
+ .lastEpochMicrosec("") //
+ .sourceName("") //
+ .startEpochMicrosec("") //
+ .timeZoneOffset("") //
+ .name("") //
+ .location("") //
+ .internalLocation("internalLocation") //
+ .compression("") //
+ .fileFormatType("") //
+ .fileFormatVersion("") //
+ .build();
+ }
+
+ @Test
+ public void notingToConsume() {
+ doReturn(consumerMock).when(testedObject).createConsumerTask();
+ doReturn(Flux.empty()).when(consumerMock).execute();
+
+ testedObject.scheduleMainDatafileEventTask();
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_successfulCase() {
+ final int noOfEvents = 200;
+ final int noOfFilesPerEvent = 200;
+ final int noOfFiles = noOfEvents * noOfFilesPerEvent;
+
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(noOfFiles) //
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(noOfFiles)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(noOfFiles)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_fetchFailedOnce() {
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ Mono<Object> error = Mono.error(new Exception("problem"));
+
+ // First file collect will fail, 3 will succeed
+ doReturn(error, collectedFile, collectedFile, collectedFile) //
+ .when(fileCollectorMock) //
+ .execute(any(FileData.class), any(MessageMetaData.class), anyLong(), any(Duration.class));
+
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(3) //
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(3)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_publishFailedOnce() {
+
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+
+ Mono<Object> error = Mono.error(new Exception("problem"));
+ // One publish will fail, the rest will succeed
+ doReturn(collectedFile, error, collectedFile, collectedFile) //
+ .when(dataRouterMock) //
+ .execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(3) // 3 completed files
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(4)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(4)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+ @Test
+ public void consume_successfulCase_sameFileNames() {
+ final int noOfEvents = 1;
+ final int noOfFilesPerEvent = 100;
+
+ // 100 files with the same name
+ Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
+ doReturn(fileReadyMessages).when(consumerMock).execute();
+
+ Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
+ doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull());
+ doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull());
+
+ StepVerifier.create(testedObject.createMainTask()).expectSubscription() //
+ .expectNextCount(1) // 99 is skipped
+ .expectComplete() //
+ .verify(); //
+
+ assertEquals(0, testedObject.getCurrentNumberOfTasks());
+ verify(consumerMock, times(1)).execute();
+ verify(fileCollectorMock, times(1)).execute(notNull(), notNull(), anyLong(), notNull());
+ verify(dataRouterMock, times(1)).execute(notNull(), anyLong(), notNull());
+ verifyNoMoreInteractions(dataRouterMock);
+ verifyNoMoreInteractions(fileCollectorMock);
+ verifyNoMoreInteractions(consumerMock);
+ }
+
+
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
index 10c5b167..804b46e9 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -67,7 +67,12 @@ public class XnfCollectorTaskImplTest {
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private static final String FTPES_LOCATION_NO_PORT =
+ FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+
private static final String GZIP_COMPRESSION = "gzip";
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
@@ -100,11 +105,11 @@ public class XnfCollectorTaskImplTest {
// @formatter:on
}
- private FileData createFileData() {
+ private FileData createFileData(String location) {
// @formatter:off
return ImmutableFileData.builder()
.name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
+ .location(location)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
@@ -113,7 +118,7 @@ public class XnfCollectorTaskImplTest {
// @formatter:on
}
- private ConsumerDmaapModel createExpectedConsumerDmaapModel() {
+ private ConsumerDmaapModel createExpectedConsumerDmaapModel(String location) {
// @formatter:off
return ImmutableConsumerDmaapModel.builder()
.productName(PRODUCT_NAME)
@@ -123,7 +128,7 @@ public class XnfCollectorTaskImplTest {
.startEpochMicrosec(START_EPOCH_MICROSEC)
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
+ .location(location)
.internalLocation(LOCAL_FILE_LOCATION.toString())
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -146,9 +151,9 @@ public class XnfCollectorTaskImplTest {
FileCollector collectorUndetTest =
new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- FileData fileData = createFileData();
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
- ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
@@ -168,7 +173,7 @@ public class XnfCollectorTaskImplTest {
// @formatter:off
FileData fileData = ImmutableFileData.builder()
.name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
+ .location(SFTP_LOCATION_NO_PORT)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
@@ -183,7 +188,7 @@ public class XnfCollectorTaskImplTest {
.startEpochMicrosec(START_EPOCH_MICROSEC)
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
+ .location(SFTP_LOCATION_NO_PORT)
.internalLocation(LOCAL_FILE_LOCATION.toString())
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -202,7 +207,7 @@ public class XnfCollectorTaskImplTest {
public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
FileCollector collectorUndetTest =
new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- FileData fileData = createFileData();
+ FileData fileData = createFileData(FTPES_LOCATION);
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -219,9 +224,9 @@ public class XnfCollectorTaskImplTest {
doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- FileData fileData = createFileData();
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
.expectNext(expectedConsumerDmaapModel).verifyComplete();