aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2019-03-14 14:27:30 +0000
committerPatrikBuhr <patrik.buhr@est.tech>2019-03-14 14:27:30 +0000
commitad4a3a514bd943df22a2e27d78f0706d412ebe9f (patch)
treee13648b6efbb335bf8e4f5f7a853615f8626d30e /datafile-app-server/src/main/java
parenta89e09eecabd035f1c227b1ae3f5fa59eff36be4 (diff)
Thread safety issues
The TrustManager is now loaded and initialized once in a thread safe way (instead of each time it is used). Removed some unneeded wrappers. Using AutoCloseable for FTP clients to make sure they are closed in case of exceptions. Made AppConfig thread safe. Change-Id: Ia6a2c8a76bf960013180fdd7c53ae0ff17b26505 Issue-ID: DCAEGEN2-1118 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'datafile-app-server/src/main/java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java101
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java10
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java47
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java18
4 files changed, 89 insertions, 87 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 40de33dd..82c390f7 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -2,39 +2,44 @@
* ============LICENSE_START======================================================================
* Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ServiceLoader;
+
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import java.io.*;
-import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
-import javax.validation.constraints.NotEmpty;
-import javax.validation.constraints.NotNull;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -55,49 +60,53 @@ public class AppConfig {
private static final String SECURITY = "security";
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
- DmaapConsumerConfiguration dmaapConsumerConfiguration;
-
- DmaapPublisherConfiguration dmaapPublisherConfiguration;
-
- FtpesConfig ftpesConfig;
+ private DmaapConsumerConfiguration dmaapConsumerConfiguration;
+ private DmaapPublisherConfiguration dmaapPublisherConfiguration;
+ private FtpesConfig ftpesConfiguration;
@NotEmpty
private String filepath;
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
return dmaapConsumerConfiguration;
}
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
return dmaapPublisherConfiguration;
}
- public FtpesConfig getFtpesConfiguration() {
- return ftpesConfig;
+ public synchronized FtpesConfig getFtpesConfiguration() {
+ return ftpesConfiguration;
}
- public void initFileStreamReader() {
+ public void loadConfigurationFromFile() {
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
JsonParser parser = new JsonParser();
JsonObject jsonObject;
- try (InputStream inputStream = getInputStream(filepath)) {
+ try (InputStream inputStream = createInputStream(filepath)) {
JsonElement rootElement = getJsonElement(parser, inputStream);
if (rootElement.isJsonObject()) {
jsonObject = rootElement.getAsJsonObject();
- ftpesConfig = deserializeType(gsonBuilder,
+ FtpesConfig ftpesConfig = deserializeType(gsonBuilder,
jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION),
FtpesConfig.class);
- dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
+ DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder,
+ concatenateJsonObjects(
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
+ .getAsJsonObject(DMAAP_CONSUMER),
+ rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
DmaapConsumerConfiguration.class);
- dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
+ DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder,
+ concatenateJsonObjects(
+ jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
+ .getAsJsonObject(DMAAP_PRODUCER),
+ rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
DmaapPublisherConfiguration.class);
+
+ setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig);
}
} catch (IOException e) {
logger.error("Problem with file loading, file: {}", filepath, e);
@@ -106,30 +115,36 @@ public class AppConfig {
}
}
+ synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration,
+ DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) {
+ this.dmaapConsumerConfiguration = consumerConfiguration;
+ this.dmaapPublisherConfiguration = publisherConfiguration;
+ this.ftpesConfiguration = ftpesConfig;
+ }
+
JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
return parser.parse(new InputStreamReader(inputStream));
}
private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
+ @NotNull Class<T> type) {
return gsonBuilder.create().fromJson(jsonObject, type);
}
- InputStream getInputStream(@NotNull String filepath) throws IOException {
+ InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
- String getFilepath() {
+ synchronized String getFilepath() {
return this.filepath;
}
- public void setFilepath(String filepath) {
+ public synchronized void setFilepath(String filepath) {
this.filepath = filepath;
}
private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet()
- .forEach(entry -> target.add(entry.getKey(), entry.getValue()));
+ source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue()));
return target;
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
index 208e8a43..0254597e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java
@@ -56,7 +56,7 @@ public class CloudConfiguration extends AppConfig {
private Properties systemEnvironment;
@Autowired
- public void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
+ public synchronized void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) {
this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider;
}
@@ -80,7 +80,7 @@ public class CloudConfiguration extends AppConfig {
.subscribe(this::parseCloudConfig, this::cloudConfigError);
}
- private void parseCloudConfig(JsonObject jsonObject) {
+ private synchronized void parseCloudConfig(JsonObject jsonObject) {
logger.info("Received application configuration: {}", jsonObject);
CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
@@ -89,17 +89,17 @@ public class CloudConfiguration extends AppConfig {
}
@Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+ public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
}
@Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+ public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
}
@Override
- public FtpesConfig getFtpesConfiguration() {
+ public synchronized FtpesConfig getFtpesConfiguration() {
return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration());
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index a0020318..af4670e3 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -17,6 +17,7 @@
package org.onap.dcaegen2.collectors.datafile.tasks;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -40,22 +41,16 @@ import reactor.core.publisher.Mono;
public class FileCollector {
private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
- private AppConfig datafileAppConfig;
- private final FtpsClient ftpsClient;
- private final SftpClient sftpClient;
+ private final AppConfig datafileAppConfig;
-
- public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) {
+ public FileCollector(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
- this.ftpsClient = ftpsClient;
- this.sftpClient = sftpClient;
}
public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
Duration firstBackoffTimeout, Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Entering execute with {}", fileData);
- resolveKeyStore();
//@formatter:off
return Mono.just(fileData)
@@ -65,18 +60,6 @@ public class FileCollector {
//@formatter:on
}
- private FtpesConfig resolveConfiguration() {
- return datafileAppConfig.getFtpesConfiguration();
- }
-
- private void resolveKeyStore() {
- FtpesConfig ftpesConfig = resolveConfiguration();
- ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
- ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
- ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
- ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
- }
-
private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
@@ -85,11 +68,8 @@ public class FileCollector {
final String remoteFile = fileData.remoteFilePath();
final Path localFile = fileData.getLocalFileName();
- try {
+ try (FileCollectClient currentClient = createClient(fileData)) {
localFile.getParent().toFile().mkdir(); // Create parent directories
-
- FileCollectClient currentClient = selectClient(fileData);
-
currentClient.collectFile(remoteFile, localFile);
return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
} catch (Exception throwable) {
@@ -98,12 +78,12 @@ public class FileCollector {
}
}
- private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException {
+ private FileCollectClient createClient(FileData fileData) throws DatafileTaskException {
switch (fileData.scheme()) {
case SFTP:
- return sftpClient;
+ return createSftpClient(fileData);
case FTPS:
- return ftpsClient;
+ return createFtpsClient(fileData);
default:
throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme());
}
@@ -129,4 +109,17 @@ public class FileCollector {
.build();
// @formatter:on
}
+
+ SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
+ SftpClient client = new SftpClient(fileData.fileServerData());
+ client.open();
+ return client;
+ }
+
+ FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException {
+ FtpesConfig config = datafileAppConfig.getFtpesConfiguration();
+ FtpsClient client = new FtpsClient(fileData.fileServerData());
+ client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword());
+ return client;
+ }
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 89ebde8f..28963377 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -57,12 +55,10 @@ public class ScheduledTasks {
/** Data needed for fetching of one file */
private class FileCollectionData {
final FileData fileData;
- final FileCollector collectorTask;
final MessageMetaData metaData;
- FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
+ FileCollectionData(FileData fd, MessageMetaData metaData) {
this.fileData = fd;
- this.collectorTask = collectorTask;
this.metaData = metaData;
}
}
@@ -90,7 +86,7 @@ public class ScheduledTasks {
public void scheduleMainDatafileEventTask(Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Execution of tasks was registered");
- applicationConfiguration.initFileStreamReader();
+ applicationConfiguration.loadConfigurationFromFile();
createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap),
() -> onComplete(contextMap));
}
@@ -143,8 +139,7 @@ public class ScheduledTasks {
List<FileCollectionData> fileCollects = new ArrayList<>();
for (FileData fileData : availableFiles.files()) {
- fileCollects.add(
- new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData()));
+ fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData()));
}
return Flux.fromIterable(fileCollects);
}
@@ -159,7 +154,7 @@ public class ScheduledTasks {
final Duration initialRetryTimeout = Duration.ofSeconds(5);
MdcVariables.setMdcContextMap(contextMap);
- return fileCollect.collectorTask
+ return createFileCollector()
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
contextMap)
.onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
@@ -236,9 +231,8 @@ public class ScheduledTasks {
return new DMaaPMessageConsumerTask(this.applicationConfiguration);
}
- FileCollector createFileCollector(FileData fileData) {
- return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()),
- new SftpClient(fileData.fileServerData()));
+ FileCollector createFileCollector() {
+ return new FileCollector(applicationConfiguration);
}
DataRouterPublisher createDataRouterPublisher() {