aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java101
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java18
4 files changed, 89 insertions, 87 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 40de33dd..82c390f7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -2,39 +2,44 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ServiceLoader;
+
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import java.io.*;
-import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
-import javax.validation.constraints.NotEmpty;
-import javax.validation.constraints.NotNull;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -55,49 +60,53 @@ public class AppConfig {
private static final String SECURITY = "security";
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- DmaapConsumerConfiguration dmaapConsumerConfiguration;
-
- DmaapPublisherConfiguration dmaapPublisherConfiguration;
-
- FtpesConfig ftpesConfig;
+ private DmaapConsumerConfiguration dmaapConsumerConfiguration;
+ private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private FtpesConfig ftpesConfiguration;
@NotEmpty
private String filepath;
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
return dmaapConsumerConfiguration;
}
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
return dmaapPublisherConfiguration;
}
- public FtpesConfig getFtpesConfiguration() {
- return ftpesConfig;
+ public synchronized FtpesConfig getFtpesConfiguration() {
+ return ftpesConfiguration;
}
- public void initFileStreamReader() {
+ public void loadConfigurationFromFile() {
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
JsonParser parser = new JsonParser();
JsonObject jsonObject;
- try (InputStream inputStream = getInputStream(filepath)) {
+ try (InputStream inputStream = createInputStream(filepath)) {
JsonElement rootElement = getJsonElement(parser, inputStream);
if (rootElement.isJsonObject()) {
jsonObject = rootElement.getAsJsonObject();
- ftpesConfig = deserializeType(gsonBuilder,
+ FtpesConfig ftpesConfig = deserializeType(gsonBuilder,
jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
FtpesConfig.class);
- dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
+ DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder,
+ concatenateJsonObjects(
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
+ .getAsJsonObject(DMAAP_CONSUMER),
+ rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
DmaapConsumerConfiguration.class);
- dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
+ DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder,
+ concatenateJsonObjects(
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
+ .getAsJsonObject(DMAAP_PRODUCER),
+ rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
DmaapPublisherConfiguration.class);
+
+ setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
}
} catch (IOException e) {
logger.error("Problem with file loading, file: {}", filepath, e);
@@ -106,30 +115,36 @@ public class AppConfig {
}
}
+ synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration,
+ DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) {
+ this.dmaapConsumerConfiguration = consumerConfiguration;
+ this.dmaapPublisherConfiguration = publisherConfiguration;
+ this.ftpesConfiguration = ftpesConfig;
+ }
+
JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
return parser.parse(new InputStreamReader(inputStream));
}
private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
+ @NotNull Class<T> type) {
return gsonBuilder.create().fromJson(jsonObject, type);
}
- InputStream getInputStream(@NotNull String filepath) throws IOException {
+ InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
- String getFilepath() {
+ synchronized String getFilepath() {
return this.filepath;
}
- public void setFilepath(String filepath) {
+ public synchronized void setFilepath(String filepath) {
this.filepath = filepath;
}
private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet()
- .forEach(entry -> target.add(entry.getKey(), entry.getValue()));
+ source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue()));
return target;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 208e8a43..0254597e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -56,7 +56,7 @@ public class CloudConfiguration extends AppConfig {
private Properties systemEnvironment;
@Autowired
- public void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ public synchronized void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
@@ -80,7 +80,7 @@ public class CloudConfiguration extends AppConfig {
.subscribe(this::parseCloudConfig, this::cloudConfigError);
}
- private void parseCloudConfig(JsonObject jsonObject) {
+ private synchronized void parseCloudConfig(JsonObject jsonObject) {
logger.info("Received application configuration: {}", jsonObject);
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
@@ -89,17 +89,17 @@ public class CloudConfiguration extends AppConfig {
}
@Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
}
@Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
}
@Override
- public FtpesConfig getFtpesConfiguration() {
+ public synchronized FtpesConfig getFtpesConfiguration() {
return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index a0020318..af4670e3 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -17,6 +17,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -40,22 +41,16 @@ import reactor.core.publisher.Mono;
public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
- private AppConfig datafileAppConfig;
- private final FtpsClient ftpsClient;
- private final SftpClient sftpClient;
+ private final AppConfig datafileAppConfig;
-
- public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) {
+ public FileCollector(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
- this.ftpsClient = ftpsClient;
- this.sftpClient = sftpClient;
}
public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
Duration firstBackoffTimeout, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
- resolveKeyStore();
//@formatter:off
return Mono.just(fileData)
@@ -65,18 +60,6 @@ public class FileCollector {
//@formatter:on
}
- private FtpesConfig resolveConfiguration() {
- return datafileAppConfig.getFtpesConfiguration();
- }
-
- private void resolveKeyStore() {
- FtpesConfig ftpesConfig = resolveConfiguration();
- ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
- ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
- ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
- ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
- }
-
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
@@ -85,11 +68,8 @@ public class FileCollector {
final String remoteFile = fileData.remoteFilePath();
final Path localFile = fileData.getLocalFileName();
- try {
+ try (FileCollectClient currentClient = createClient(fileData)) {
localFile.getParent().toFile().mkdir(); // Create parent directories
-
- FileCollectClient currentClient = selectClient(fileData);
-
currentClient.collectFile(remoteFile, localFile);
return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
} catch (Exception throwable) {
@@ -98,12 +78,12 @@ public class FileCollector {
}
}
- private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException {
+ private FileCollectClient createClient(FileData fileData) throws DatafileTaskException {
switch (fileData.scheme()) {
case SFTP:
- return sftpClient;
+ return createSftpClient(fileData);
case FTPS:
- return ftpsClient;
+ return createFtpsClient(fileData);
default:
throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme());
}
@@ -129,4 +109,17 @@ public class FileCollector {
.build();
// @formatter:on
}
+
+ SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
+ SftpClient client = new SftpClient(fileData.fileServerData());
+ client.open();
+ return client;
+ }
+
+ FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException {
+ FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
+ FtpsClient client = new FtpsClient(fileData.fileServerData());
+ client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword());
+ return client;
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 89ebde8f..28963377 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -57,12 +55,10 @@ public class ScheduledTasks {
/** Data needed for fetching of one file */
private class FileCollectionData {
final FileData fileData;
- final FileCollector collectorTask;
final MessageMetaData metaData;
- FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
+ FileCollectionData(FileData fd, MessageMetaData metaData) {
this.fileData = fd;
- this.collectorTask = collectorTask;
this.metaData = metaData;
}
}
@@ -90,7 +86,7 @@ public class ScheduledTasks {
public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Execution of tasks was registered");
- applicationConfiguration.initFileStreamReader();
+ applicationConfiguration.loadConfigurationFromFile();
createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
() -> onComplete(contextMap));
}
@@ -143,8 +139,7 @@ public class ScheduledTasks {
List<FileCollectionData> fileCollects = new ArrayList<>();
for (FileData fileData : availableFiles.files()) {
- fileCollects.add(
- new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData()));
+ fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData()));
}
return Flux.fromIterable(fileCollects);
}
@@ -159,7 +154,7 @@ public class ScheduledTasks {
final Duration initialRetryTimeout = Duration.ofSeconds(5);
MdcVariables.setMdcContextMap(contextMap);
- return fileCollect.collectorTask
+ return createFileCollector()
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
contextMap)
.onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
@@ -236,9 +231,8 @@ public class ScheduledTasks {
return new DMaaPMessageConsumerTask(this.applicationConfiguration);
}
- FileCollector createFileCollector(FileData fileData) {
- return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
- new SftpClient(fileData.fileServerData()));
+ FileCollector createFileCollector() {
+ return new FileCollector(applicationConfiguration);
}
DataRouterPublisher createDataRouterPublisher() {