diff options
Diffstat (limited to 'datafile-app-server/src/main')
4 files changed, 89 insertions, 87 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index 40de33dd..82c390f7 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -2,39 +2,44 @@ * ============LICENSE_START====================================================================== * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. * ============LICENSE_END======================================================================== */ package org.onap.dcaegen2.collectors.datafile.configuration; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; +import com.google.gson.TypeAdapterFactory; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ServiceLoader; + +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; + import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import java.io.*; -import java.util.ServiceLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.stereotype.Component; -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; -import com.google.gson.TypeAdapterFactory; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -55,49 +60,53 @@ public class AppConfig { private static final String SECURITY = "security"; private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); - DmaapConsumerConfiguration dmaapConsumerConfiguration; - - DmaapPublisherConfiguration dmaapPublisherConfiguration; - - FtpesConfig ftpesConfig; + private DmaapConsumerConfiguration dmaapConsumerConfiguration; + private DmaapPublisherConfiguration dmaapPublisherConfiguration; + private FtpesConfig ftpesConfiguration; @NotEmpty private String filepath; - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { return dmaapConsumerConfiguration; } - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { return dmaapPublisherConfiguration; } - public FtpesConfig getFtpesConfiguration() { - return ftpesConfig; + public synchronized FtpesConfig getFtpesConfiguration() { + return ftpesConfiguration; } - public void initFileStreamReader() { + public void loadConfigurationFromFile() { GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); JsonParser parser = new JsonParser(); JsonObject jsonObject; - try (InputStream inputStream = getInputStream(filepath)) { + try (InputStream inputStream = createInputStream(filepath)) { JsonElement rootElement = getJsonElement(parser, inputStream); if (rootElement.isJsonObject()) { jsonObject = rootElement.getAsJsonObject(); - ftpesConfig = deserializeType(gsonBuilder, + FtpesConfig ftpesConfig = deserializeType(gsonBuilder, jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(FTP).getAsJsonObject(FTPES_CONFIGURATION), FtpesConfig.class); - dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_CONSUMER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), + DmaapConsumerConfiguration consumerConfiguration = deserializeType(gsonBuilder, + concatenateJsonObjects( + jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) + .getAsJsonObject(DMAAP_CONSUMER), + rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), DmaapConsumerConfiguration.class); - dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP).getAsJsonObject(DMAAP_PRODUCER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), + DmaapPublisherConfiguration publisherConfiguration = deserializeType(gsonBuilder, + concatenateJsonObjects( + jsonObject.getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) + .getAsJsonObject(DMAAP_PRODUCER), + rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), DmaapPublisherConfiguration.class); + + setConfiguration(consumerConfiguration, publisherConfiguration, ftpesConfig); } } catch (IOException e) { logger.error("Problem with file loading, file: {}", filepath, e); @@ -106,30 +115,36 @@ public class AppConfig { } } + synchronized void setConfiguration(DmaapConsumerConfiguration consumerConfiguration, + DmaapPublisherConfiguration publisherConfiguration, FtpesConfig ftpesConfig) { + this.dmaapConsumerConfiguration = consumerConfiguration; + this.dmaapPublisherConfiguration = publisherConfiguration; + this.ftpesConfiguration = ftpesConfig; + } + JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { return parser.parse(new InputStreamReader(inputStream)); } private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, - @NotNull Class<T> type) { + @NotNull Class<T> type) { return gsonBuilder.create().fromJson(jsonObject, type); } - InputStream getInputStream(@NotNull String filepath) throws IOException { + InputStream createInputStream(@NotNull String filepath) throws IOException { return new BufferedInputStream(new FileInputStream(filepath)); } - String getFilepath() { + synchronized String getFilepath() { return this.filepath; } - public void setFilepath(String filepath) { + public synchronized void setFilepath(String filepath) { this.filepath = filepath; } private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) { - source.entrySet() - .forEach(entry -> target.add(entry.getKey(), entry.getValue())); + source.entrySet().forEach(entry -> target.add(entry.getKey(), entry.getValue())); return target; } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java index 208e8a43..0254597e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java @@ -56,7 +56,7 @@ public class CloudConfiguration extends AppConfig { private Properties systemEnvironment; @Autowired - public void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) { + public synchronized void setThreadPoolTaskScheduler(ReactiveCloudConfigurationProvider reactiveCloudConfigurationProvider) { this.reactiveCloudConfigurationProvider = reactiveCloudConfigurationProvider; } @@ -80,7 +80,7 @@ public class CloudConfiguration extends AppConfig { .subscribe(this::parseCloudConfig, this::cloudConfigError); } - private void parseCloudConfig(JsonObject jsonObject) { + private synchronized void parseCloudConfig(JsonObject jsonObject) { logger.info("Received application configuration: {}", jsonObject); CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); @@ -89,17 +89,17 @@ public class CloudConfiguration extends AppConfig { } @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + public synchronized DmaapPublisherConfiguration getDmaapPublisherConfiguration() { return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration()); } @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + public synchronized DmaapConsumerConfiguration getDmaapConsumerConfiguration() { return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration()); } @Override - public FtpesConfig getFtpesConfiguration() { + public synchronized FtpesConfig getFtpesConfiguration() { return Optional.ofNullable(ftpesCloudConfiguration).orElse(super.getFtpesConfiguration()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java index a0020318..af4670e3 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java @@ -17,6 +17,7 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import java.util.Map; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -40,22 +41,16 @@ import reactor.core.publisher.Mono; public class FileCollector { private static final Logger logger = LoggerFactory.getLogger(FileCollector.class); - private AppConfig datafileAppConfig; - private final FtpsClient ftpsClient; - private final SftpClient sftpClient; + private final AppConfig datafileAppConfig; - - public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) { + public FileCollector(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; - this.ftpsClient = ftpsClient; - this.sftpClient = sftpClient; } public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries, Duration firstBackoffTimeout, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); logger.trace("Entering execute with {}", fileData); - resolveKeyStore(); //@formatter:off return Mono.just(fileData) @@ -65,18 +60,6 @@ public class FileCollector { //@formatter:on } - private FtpesConfig resolveConfiguration() { - return datafileAppConfig.getFtpesConfiguration(); - } - - private void resolveKeyStore() { - FtpesConfig ftpesConfig = resolveConfiguration(); - ftpsClient.setKeyCertPath(ftpesConfig.keyCert()); - ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword()); - ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA()); - ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword()); - } - private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData, Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); @@ -85,11 +68,8 @@ public class FileCollector { final String remoteFile = fileData.remoteFilePath(); final Path localFile = fileData.getLocalFileName(); - try { + try (FileCollectClient currentClient = createClient(fileData)) { localFile.getParent().toFile().mkdir(); // Create parent directories - - FileCollectClient currentClient = selectClient(fileData); - currentClient.collectFile(remoteFile, localFile); return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile)); } catch (Exception throwable) { @@ -98,12 +78,12 @@ public class FileCollector { } } - private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException { + private FileCollectClient createClient(FileData fileData) throws DatafileTaskException { switch (fileData.scheme()) { case SFTP: - return sftpClient; + return createSftpClient(fileData); case FTPS: - return ftpsClient; + return createFtpsClient(fileData); default: throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme()); } @@ -129,4 +109,17 @@ public class FileCollector { .build(); // @formatter:on } + + SftpClient createSftpClient(FileData fileData) throws DatafileTaskException { + SftpClient client = new SftpClient(fileData.fileServerData()); + client.open(); + return client; + } + + FtpsClient createFtpsClient(FileData fileData) throws DatafileTaskException { + FtpesConfig config = datafileAppConfig.getFtpesConfiguration(); + FtpsClient client = new FtpsClient(fileData.fileServerData()); + client.open(config.keyCert(), config.keyPassword(), Paths.get(config.trustedCA()), config.trustedCAPassword()); + return client; + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 89ebde8f..28963377 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -26,8 +26,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; -import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; @@ -57,12 +55,10 @@ public class ScheduledTasks { /** Data needed for fetching of one file */ private class FileCollectionData { final FileData fileData; - final FileCollector collectorTask; final MessageMetaData metaData; - FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) { + FileCollectionData(FileData fd, MessageMetaData metaData) { this.fileData = fd; - this.collectorTask = collectorTask; this.metaData = metaData; } } @@ -90,7 +86,7 @@ public class ScheduledTasks { public void scheduleMainDatafileEventTask(Map<String, String> contextMap) { MdcVariables.setMdcContextMap(contextMap); logger.trace("Execution of tasks was registered"); - applicationConfiguration.initFileStreamReader(); + applicationConfiguration.loadConfigurationFromFile(); createMainTask(contextMap).subscribe(model -> onSuccess(model, contextMap), thr -> onError(thr, contextMap), () -> onComplete(contextMap)); } @@ -143,8 +139,7 @@ public class ScheduledTasks { List<FileCollectionData> fileCollects = new ArrayList<>(); for (FileData fileData : availableFiles.files()) { - fileCollects.add( - new FileCollectionData(fileData, createFileCollector(fileData), availableFiles.messageMetaData())); + fileCollects.add(new FileCollectionData(fileData, availableFiles.messageMetaData())); } return Flux.fromIterable(fileCollects); } @@ -159,7 +154,7 @@ public class ScheduledTasks { final Duration initialRetryTimeout = Duration.ofSeconds(5); MdcVariables.setMdcContextMap(contextMap); - return fileCollect.collectorTask + return createFileCollector() .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout, contextMap) .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap)); @@ -236,9 +231,8 @@ public class ScheduledTasks { return new DMaaPMessageConsumerTask(this.applicationConfiguration); } - FileCollector createFileCollector(FileData fileData) { - return new FileCollector(applicationConfiguration, new FtpsClient(fileData.fileServerData()), - new SftpClient(fileData.fileServerData())); + FileCollector createFileCollector() { + return new FileCollector(applicationConfiguration); } DataRouterPublisher createDataRouterPublisher() { |