/*- * ============LICENSE_START======================================================= * Copyright (C) 2018, 2020-2022 Nokia. All rights reserved. * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import javax.validation.constraints.NotNull; import io.vavr.collection.Stream; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore; import org.onap.dcaegen2.services.sdk.security.ssl.Passwords; import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Parses the cloud configuration. * * @author Przemysław Wąsala on 9/19/18 * @author Henrik Andersson */ public class CloudConfigParser { private static final String DMAAP_SECURITY_TRUST_STORE_PATH = "dmaap.security.trustStorePath"; private static final String DMAAP_SECURITY_TRUST_STORE_PASS_PATH = "dmaap.security.trustStorePasswordPath"; private static final String DMAAP_SECURITY_KEY_STORE_PATH = "dmaap.security.keyStorePath"; private static final String DMAAP_SECURITY_KEY_STORE_PASS_PATH = "dmaap.security.keyStorePasswordPath"; private static final String DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH = "dmaap.security.enableDmaapCertAuth"; private static final String CONFIG = "config"; private static final String KNOWN_HOSTS_FILE_PATH_ENV_PROPERTY = "KNOWN_HOSTS_FILE_PATH"; private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING = "sftp.security.strictHostKeyChecking"; private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup"; private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId"; private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs"; private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1; private static final int FIRST_SOURCE_INDEX = 0; private static final Logger logger = LoggerFactory.getLogger(CloudConfigParser.class); private final Properties systemEnvironment; private final JsonObject jsonObject; public CloudConfigParser(JsonObject jsonObject, Properties systemEnvironment) { this.jsonObject = jsonObject.getAsJsonObject(CONFIG); this.systemEnvironment = systemEnvironment; } /** * Get the publisher configurations. * * @return a map with change identifier as key and the connected publisher configuration as value. * @throws DatafileTaskException if a member of the configuration is missing. */ public @NotNull Map getDmaapPublisherConfigurations() throws DatafileTaskException { JsonObject producerCfgs = jsonObject.get("streams_publishes").getAsJsonObject(); Iterator changeIdentifierList = producerCfgs.keySet().iterator(); Map result = new HashMap<>(); while (changeIdentifierList.hasNext()) { String changeIdentifier = changeIdentifierList.next(); JsonObject producerCfg = getAsJson(producerCfgs, changeIdentifier); JsonObject feedConfig = get(producerCfg, "dmaap_info").getAsJsonObject(); PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() // .publishUrl(getAsString(feedConfig, "publish_url")) // .password(getAsString(feedConfig, "password")) // .userName(getAsString(feedConfig, "username")) // .trustStorePath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH,"")) // .trustStorePasswordPath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH, "")) // .keyStorePath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH,"")) // .keyStorePasswordPath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH,"")) // .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // .changeIdentifier(changeIdentifier) // .logUrl(getAsString(feedConfig, "log_url")) // .build(); result.put(cfg.changeIdentifier(), cfg); } return result; } /** * Get the consumer configuration. * * @return the consumer configuration. * @throws DatafileTaskException if the configuration is invalid. */ public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException { try { MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig(); MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest(); MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig); return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest); } catch (Exception e) { throw new DatafileTaskException("Could not parse message router consumer configuration", e); } } private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException { return ImmutableMessageRouterSubscriberConfig.builder() .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null) .build(); } private SecurityKeys createSecurityKeys() throws DatafileTaskException { return ImmutableSecurityKeys.builder() .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))) .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))) .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))) .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))) .build(); } private boolean isDmaapCertAuthEnabled(JsonObject config) { return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean(); } private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException { Stream> sources = DataStreams.namedSources(jsonObject); if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) { throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources); } RawDataStream source = sources.get(FIRST_SOURCE_INDEX); MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source); return ImmutableMessageRouterSubscribeRequest.builder() .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP)) .sourceDefinition(parsedSource) .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID)) .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong())) .build(); } /** * Get the sFTP configuration. * * @return the sFTP configuration. * @throws DatafileTaskException if a member of the configuration is missing. */ public @NotNull SftpConfig getSftpConfig() throws DatafileTaskException { String filePath = determineKnownHostsFilePath(); return new ImmutableSftpConfig.Builder() // .strictHostKeyChecking(getAsBoolean(jsonObject, CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING)) .knownHostsFilePath(filePath).build(); } /** * Get the security configuration for communication with the xNF. * * @return the xNF communication security configuration. * @throws DatafileTaskException if a member of the configuration is missing. */ public @NotNull CertificateConfig getCertificateConfig() throws DatafileTaskException { boolean enableCertAuth = getAsBooleanOrDefault(jsonObject, "dmaap.certificateConfig.enableCertAuth", Boolean.TRUE); String keyCert = ""; String keyPasswordPath = ""; String trustedCa = ""; String trustedCaPasswordPath = ""; boolean httpsHostnameVerify = true; if (enableCertAuth) { logger.debug("TlS enabled, attempt to read certificates property"); try { keyCert = getAsString(jsonObject, "dmaap.certificateConfig.keyCert"); keyPasswordPath = getAsString(jsonObject, "dmaap.certificateConfig.keyPasswordPath"); trustedCa = getAsString(jsonObject, "dmaap.certificateConfig.trustedCa"); trustedCaPasswordPath = getAsString(jsonObject, "dmaap.certificateConfig.trustedCaPasswordPath"); httpsHostnameVerify = getAsBooleanOrDefault(jsonObject, "dmaap.certificateConfig.httpsHostnameVerify", Boolean.TRUE); } catch (DatafileTaskException e) { throw new DatafileTaskException( "Wrong configuration. External certificate enabled but configs are missing: " + e.getMessage()); } } return new ImmutableCertificateConfig.Builder() // .keyCert(keyCert) .keyPasswordPath(keyPasswordPath) .trustedCa(trustedCa) .trustedCaPasswordPath(trustedCaPasswordPath) // .httpsHostnameVerify(httpsHostnameVerify) .enableCertAuth(enableCertAuth) .build(); } private String determineKnownHostsFilePath() { String filePath = ""; if (systemEnvironment != null) { filePath = systemEnvironment.getProperty(KNOWN_HOSTS_FILE_PATH_ENV_PROPERTY, "/home/datafile/.ssh/known_hosts"); } return filePath; } private static @NotNull JsonElement get(JsonObject obj, String memberName) throws DatafileTaskException { JsonElement elem = obj.get(memberName); if (elem == null) { throw new DatafileTaskException("Could not find member: " + memberName + " in: " + obj); } return elem; } private static @NotNull String getAsString(JsonObject obj, String memberName) throws DatafileTaskException { return get(obj, memberName).getAsString(); } private static String getAsOptionalStringOrDefault(JsonObject obj, String memberName, String def) { try { return get(obj, memberName).getAsString(); } catch (DatafileTaskException e) { return def; } } private static @NotNull Boolean getAsBoolean(JsonObject obj, String memberName) throws DatafileTaskException { return get(obj, memberName).getAsBoolean(); } private static @NotNull Boolean getAsBooleanOrDefault(JsonObject obj, String memberName, Boolean def) { try { return get(obj, memberName).getAsBoolean(); } catch (DatafileTaskException e) { return def; } } private static @NotNull JsonObject getAsJson(JsonObject obj, String memberName) throws DatafileTaskException { return get(obj, memberName).getAsJsonObject(); } private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException { return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath)); } }