/*-
* ============LICENSE_START======================================================================
* Copyright (C) 2018, 2020-2022 Nokia. All rights reserved.
* Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
* ===============================================================================================
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END========================================================================
*/
package org.onap.dcaegen2.collectors.datafile.configuration;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import com.google.gson.TypeAdapterFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.CbsClientConfigurationException;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Holds all configuration for the DFC.
*
* @author Przemysław Wąsala on 3/23/18
* @author Henrik Andersson
*/
@Component
@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
@EnableConfigurationProperties
@ConfigurationProperties("app")
public class AppConfig {
private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
@Value("#{systemEnvironment}")
Properties systemEnvironment;
private ConsumerConfiguration dmaapConsumerConfiguration;
private Map publishingConfigurations;
private CertificateConfig certificateConfiguration;
private SftpConfig sftpConfiguration;
private Disposable refreshConfigTask = null;
@NotEmpty
private String filepath;
public synchronized void setFilepath(String filepath) {
this.filepath = filepath;
}
/**
* Reads the cloud configuration.
*/
public void initialize() {
stop();
loadConfigurationFromFile();
refreshConfigTask = createRefreshTask() //
.subscribe(e -> logger.info("Refreshed configuration data"),
throwable -> logger.error("Configuration refresh terminated due to exception", throwable),
() -> logger.error("Configuration refresh terminated"));
}
Flux createRefreshTask() {
return createCbsClientConfiguration()
.flatMap(this::createCbsClient)
.flatMapMany(this::periodicConfigurationUpdates) //
.map(this::parseCloudConfig) //
.onErrorResume(this::onErrorResume);
}
private Flux periodicConfigurationUpdates(CbsClient cbsClient) {
final Duration initialDelay = Duration.ZERO;
final Duration refreshPeriod = Duration.ofMinutes(1);
final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
return cbsClient.updates(getConfigRequest, initialDelay, refreshPeriod);
}
/**
* Stops the refreshing of the configuration.
*/
public void stop() {
if (refreshConfigTask != null) {
refreshConfigTask.dispose();
refreshConfigTask = null;
}
}
public synchronized ConsumerConfiguration getDmaapConsumerConfiguration() {
return dmaapConsumerConfiguration;
}
/**
* Checks if there is a configuration for the given feed.
*
* @param changeIdentifier the change identifier the feed is configured to belong to.
* @return true if a feed is configured for the given change identifier, false if not.
*/
public synchronized boolean isFeedConfigured(String changeIdentifier) {
return publishingConfigurations.containsKey(changeIdentifier);
}
/**
* Gets the feed configuration for the given change identifier.
*
* @param changeIdentifier the change identifier the feed is configured to belong to.
* @return the PublisherConfiguration
for the feed belonging to the given change identifier.
* @throws DatafileTaskException if no configuration has been loaded or the configuration is missing for the given
* change identifier.
*/
public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
throws DatafileTaskException {
if (publishingConfigurations == null) {
throw new DatafileTaskException("No PublishingConfiguration loaded, changeIdentifier: " + changeIdentifier);
}
PublisherConfiguration cfg = publishingConfigurations.get(changeIdentifier);
if (cfg == null) {
throw new DatafileTaskException(
"Cannot find getPublishingConfiguration for changeIdentifier: " + changeIdentifier);
}
return cfg;
}
public synchronized CertificateConfig getCertificateConfiguration() {
return certificateConfiguration;
}
public synchronized SftpConfig getSftpConfiguration() {
return sftpConfiguration;
}
private Mono onErrorResume(Throwable throwable) {
String throwableString = throwable.toString();
logger.error("Could not refresh application configuration {}", throwableString);
return Mono.empty();
}
Mono createCbsClientConfiguration() {
try {
return Mono.just(CbsClientConfiguration.fromEnvironment());
} catch (CbsClientConfigurationException e) {
return Mono.error(e);
}
}
Mono createCbsClient(CbsClientConfiguration cbsClientConfiguration) {
return CbsClientFactory.createCbsClient(cbsClientConfiguration);
}
private AppConfig parseCloudConfig(JsonObject configurationObject) {
try {
CloudConfigParser parser =
new CloudConfigParser(configurationObject, systemEnvironment);
setConfiguration(parser.getConsumerConfiguration(),
parser.getDmaapPublisherConfigurations(), parser.getCertificateConfig(),
parser.getSftpConfig());
logConfig();
} catch (DatafileTaskException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
}
return this;
}
private void logConfig() {
logger.debug("Read and parsed sFTP configuration: [{}]", sftpConfiguration);
logger.debug("Read and parsed FTPes / HTTPS configuration: [{}]", certificateConfiguration);
logger.debug("Read and parsed DMaaP configuration: [{}]", dmaapConsumerConfiguration);
logger.debug("Read and parsed Publish configuration: [{}]", publishingConfigurations);
}
void loadConfigurationFromFile() {
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
try (InputStream inputStream = createInputStream(filepath)) {
JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
if (rootObject == null) {
throw new JsonSyntaxException("Root is not a json object");
}
parseCloudConfig(rootObject);
logger.info("Local configuration file loaded: {}", filepath);
} catch (JsonSyntaxException | IOException e) {
logger.trace("Local configuration file not loaded: {}", filepath, e);
}
}
private synchronized void setConfiguration(@NotNull ConsumerConfiguration consumerConfiguration,
@NotNull Map publisherConfiguration, @NotNull CertificateConfig certificateConfig,
@NotNull SftpConfig sftpConfig) throws DatafileTaskException {
this.dmaapConsumerConfiguration = consumerConfiguration;
this.publishingConfigurations = publisherConfiguration;
this.certificateConfiguration = certificateConfig;
this.sftpConfiguration = sftpConfig;
if (!certificateConfig.enableCertAuth()) {
logger.debug("External TLS certificate disabled, skipping setup HTTPS client");
return;
}
HttpsClientConnectionManagerUtil.setupOrUpdate(certificateConfig.keyCert(), certificateConfig.keyPasswordPath(),
certificateConfig.trustedCa(), certificateConfig.trustedCaPasswordPath(),
certificateConfig.httpsHostnameVerify());
}
JsonElement getJsonElement(InputStream inputStream) {
return JsonParser.parseReader(new InputStreamReader(inputStream));
}
InputStream createInputStream(@NotNull String filepath) throws IOException {
return new BufferedInputStream(new FileInputStream(filepath));
}
}