diff options
Diffstat (limited to 'datafile-app-server/src/main/java')
12 files changed, 172 insertions, 403 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index c257ceed..d933e337 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -41,8 +41,9 @@ import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticConte import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.CbsClientConfigurationException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,12 +72,12 @@ public class AppConfig { private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); + @Value("#{systemEnvironment}") + Properties systemEnvironment; private ConsumerConfiguration dmaapConsumerConfiguration; private Map<String, PublisherConfiguration> publishingConfigurations; private FtpesConfig ftpesConfiguration; private SftpConfig sftpConfiguration; - @Value("#{systemEnvironment}") - Properties systemEnvironment; private Disposable refreshConfigTask = null; @NotEmpty @@ -102,8 +103,8 @@ public class AppConfig { } Flux<AppConfig> createRefreshTask(Map<String, String> context) { - return getEnvironment(systemEnvironment, context) // - .flatMap(this::createCbsClient) // + return createCbsClientConfiguration() + .flatMap(this::createCbsClient) .flatMapMany(this::periodicConfigurationUpdates) // .map(this::parseCloudConfig) // .onErrorResume(this::onErrorResume); @@ -175,19 +176,25 @@ public class AppConfig { return Mono.empty(); } - Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) { - return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context); + Mono<CbsClientConfiguration> createCbsClientConfiguration() { + try { + return Mono.just(CbsClientConfiguration.fromEnvironment()); + } catch (CbsClientConfigurationException e) { + return Mono.error(e); + } } - Mono<CbsClient> createCbsClient(EnvProperties env) { - return CbsClientFactory.createCbsClient(env); + Mono<CbsClient> createCbsClient(CbsClientConfiguration cbsClientConfiguration) { + return CbsClientFactory.createCbsClient(cbsClientConfiguration); } private AppConfig parseCloudConfig(JsonObject configurationObject) { try { - CloudConfigParser parser = new CloudConfigParser(configurationObject, systemEnvironment); - setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(), - parser.getFtpesConfig(), parser.getSftpConfig()); + CloudConfigParser parser = + new CloudConfigParser(configurationObject, systemEnvironment); + setConfiguration(parser.getConsumerConfiguration(), + parser.getDmaapPublisherConfigurations(), parser.getFtpesConfig(), + parser.getSftpConfig()); logConfig(); } catch (DatafileTaskException e) { logger.error("Could not parse configuration {}", e.toString(), e); @@ -207,8 +214,7 @@ public class AppConfig { ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); try (InputStream inputStream = createInputStream(filepath)) { - JsonParser parser = new JsonParser(); - JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject(); + JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject(); if (rootObject == null) { throw new JsonSyntaxException("Root is not a json object"); } @@ -228,8 +234,8 @@ public class AppConfig { this.sftpConfiguration = sftpConfig; } - JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { - return parser.parse(new InputStreamReader(inputStream)); + JsonElement getJsonElement(InputStream inputStream) { + return JsonParser.parseReader(new InputStreamReader(inputStream)); } InputStream createInputStream(@NotNull String filepath) throws IOException { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index a86a32b8..6ace4aae 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,16 +21,32 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; -import java.util.Set; import javax.validation.constraints.NotNull; +import io.vavr.collection.Stream; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore; +import org.onap.dcaegen2.services.sdk.security.ssl.Passwords; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; /** * Parses the cloud configuration. @@ -51,6 +67,12 @@ public class CloudConfigParser { private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING = "sftp.security.strictHostKeyChecking"; + private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup"; + private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId"; + private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs"; + private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1; + private static final int FIRST_SOURCE_INDEX = 0; + private final Properties systemEnvironment; private final JsonObject jsonObject; @@ -78,7 +100,7 @@ public class CloudConfigParser { PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() // .publishUrl(getAsString(feedConfig, "publish_url")) // - .passWord(getAsString(feedConfig, "password")) // + .password(getAsString(feedConfig, "password")) // .userName(getAsString(feedConfig, "username")) // .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) // .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // @@ -98,24 +120,51 @@ public class CloudConfigParser { * Get the consumer configuration. * * @return the consumer configuration. - * @throws DatafileTaskException if a member of the configuration is missing. + * @throws DatafileTaskException if the configuration is invalid. */ - public @NotNull ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException { - JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject(); - Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet(); - if (topics.size() != 1) { - throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics); + public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException { + try { + MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig(); + MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest(); + MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig); + return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest); + } catch (Exception e) { + throw new DatafileTaskException("Could not parse message router consumer configuration", e); } - JsonObject topic = topics.iterator().next().getValue().getAsJsonObject(); - JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject(); - String topicUrl = getAsString(dmaapInfo, "topic_url"); - - return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl) - .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) - .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) - .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) - .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) - .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + } + + private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException { + return ImmutableMessageRouterSubscriberConfig.builder() + .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null) + .build(); + } + + private SecurityKeys createSecurityKeys() throws DatafileTaskException { + return ImmutableSecurityKeys.builder() + .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))) + .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))) + .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))) + .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))) + .build(); + } + + private boolean isDmaapCertAuthEnabled(JsonObject config) { + return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean(); + } + + private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException { + Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject); + if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) { + throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources); + } + RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX); + MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source); + + return ImmutableMessageRouterSubscribeRequest.builder() + .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP)) + .sourceDefinition(parsedSource) + .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID)) + .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong())) .build(); } @@ -176,4 +225,8 @@ public class CloudConfigParser { return get(obj, memberName).getAsJsonObject(); } + private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException { + return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath)); + } + } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java index 4db7963d..89fcf1c2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java @@ -1,113 +1,56 @@ /*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at + * ============LICENSE_START======================================================= + * Copyright (C) 2020 NOKIA Intellectual Property. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.configuration; -import java.net.MalformedURLException; -import java.net.URL; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; -import org.immutables.gson.Gson; -import org.immutables.value.Value; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; +public class ConsumerConfiguration { -@Value.Immutable -@Value.Style(redactedMask = "####") -@Gson.TypeAdapters -public abstract class ConsumerConfiguration { - @Value.Redacted - public abstract String topicUrl(); + private final MessageRouterSubscriberConfig messageRouterSubscriberConfig; + private final MessageRouterSubscriber messageRouterSubscriber; + private final MessageRouterSubscribeRequest messageRouterSubscribeRequest; - public abstract String trustStorePath(); - - public abstract String trustStorePasswordPath(); - - public abstract String keyStorePath(); - - public abstract String keyStorePasswordPath(); - - public abstract Boolean enableDmaapCertAuth(); - - /** - * Gets the configuration in the SDK version. - * - * @return a <code>DmaapConsumerConfiguration</code> representing the configuration. - * - * @throws DatafileTaskException if something is wrong with the topic URL. - */ - public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException { - try { - URL url = new URL(topicUrl()); - String passwd = ""; - String userName = ""; - if (url.getUserInfo() != null) { - String[] userInfo = url.getUserInfo().split(":"); - userName = userInfo[0]; - passwd = userInfo[1]; - } - String urlPath = url.getPath(); - DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); - - return new ImmutableDmaapConsumerConfiguration.Builder() // - .endpointUrl(topicUrl()) // - .dmaapContentType("application/json") // - .dmaapPortNumber(url.getPort()) // - .dmaapHostName(url.getHost()) // - .dmaapTopicName(path.dmaapTopicName) // - .dmaapProtocol(url.getProtocol()) // - .dmaapUserName(userName) // - .dmaapUserPassword(passwd) // - .trustStorePath(this.trustStorePath()) // - .trustStorePasswordPath(this.trustStorePasswordPath()) // - .keyStorePath(this.keyStorePath()) // - .keyStorePasswordPath(this.keyStorePasswordPath()) // - .enableDmaapCertAuth(this.enableDmaapCertAuth()) // - .consumerId(path.consumerId) // - .consumerGroup(path.consumerGroup) // - .timeoutMs(-1) // - .messageLimit(-1) // - .build(); - } catch (MalformedURLException e) { - throw new DatafileTaskException("Could not parse the URL", e); - } + public ConsumerConfiguration(MessageRouterSubscriberConfig messageRouterSubscriberConfig, + MessageRouterSubscriber messageRouterSubscriber, MessageRouterSubscribeRequest messageRouterSubscribeRequest) { + this.messageRouterSubscriberConfig = messageRouterSubscriberConfig; + this.messageRouterSubscriber = messageRouterSubscriber; + this.messageRouterSubscribeRequest = messageRouterSubscribeRequest; } - private class DmaapConsumerUrlPath { - final String dmaapTopicName; - final String consumerGroup; - final String consumerId; - - DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { - this.dmaapTopicName = dmaapTopicName; - this.consumerGroup = consumerGroup; - this.consumerId = consumerId; - } + public MessageRouterSubscriber getMessageRouterSubscriber() { + return messageRouterSubscriber; } - private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException { - String[] tokens = urlPath.split("/"); // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12 - if (tokens.length != 5) { - throw new DatafileTaskException("The path has incorrect syntax: " + urlPath); - } + public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() { + return messageRouterSubscribeRequest; + } - final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/unauthenticated.VES_NOTIFICATION_OUTPUT - final String consumerGroup = tokens[3]; // OpenDcae-c12 - final String consumerId = tokens[4]; // C12 - return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); + public MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() { + return messageRouterSubscriberConfig; } + @Override + public String toString() { + return "ConsumerConfiguration{" + "securityKeys=" + messageRouterSubscriberConfig.securityKeys() + + ", consumerGroup=" + messageRouterSubscribeRequest.consumerGroup() + ", consumerID=" + + messageRouterSubscribeRequest.consumerId() + '}'; + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java deleted file mode 100644 index ad5f648d..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ /dev/null @@ -1,91 +0,0 @@ -/*- - * ============LICENSE_START======================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import reactor.core.publisher.Mono; - -/** - * Handling the Consul connection. - * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 - */ -class EnvironmentProcessor { - - private static final int DEFAULT_CONSUL_PORT = 8500; - private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); - - private EnvironmentProcessor() { - } - - static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - logger.trace("Loading configuration from system environment variables"); - EnvProperties envProperties; - try { - envProperties = ImmutableEnvProperties.builder() // - .consulHost(getConsulHost(systemEnvironment)) // - .consulPort(getConsultPort(systemEnvironment)) // - .cbsName(getConfigBindingService(systemEnvironment)) // - .appName(getService(systemEnvironment)) // - .build(); - } catch (EnvironmentLoaderException e) { - return Mono.error(e); - } - logger.trace("Evaluated environment system variables {}", envProperties); - return Mono.just(envProperties); - } - - private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST")) - .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined")); - } - - private static Integer getConsultPort(Properties systemEnvironments) { - return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) // - .map(Integer::valueOf) // - .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); - } - - private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) // - .orElseThrow( - () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined")); - } - - private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional - .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME")) - .orElse(systemEnvironments.getProperty("SERVICE_NAME"))) - .orElseThrow(() -> new EnvironmentLoaderException( - "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment")); - } - - private static Integer getDefaultPortOfConsul() { - logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); - return DEFAULT_CONSUL_PORT; - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java index d7451bdb..fa7d7841 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START====================================================================== * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2020 Nokia. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,13 +17,10 @@ package org.onap.dcaegen2.collectors.datafile.configuration; -import java.net.MalformedURLException; -import java.net.URL; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + @Value.Immutable @Value.Style(redactedMask = "####") @@ -36,7 +34,7 @@ public interface PublisherConfiguration { String userName(); @Value.Redacted - String passWord(); + String password(); String trustStorePath(); @@ -50,30 +48,4 @@ public interface PublisherConfiguration { String changeIdentifier(); - /** - * Get the publisher configuration in SDK format. - * - * @return a <code>DmaapPublisherConfiguration</code> contining the publisher configuration. - * @throws MalformedURLException if the publish URL is malformed. - */ - default DmaapPublisherConfiguration toDmaap() throws MalformedURLException { - URL url = new URL(publishUrl()); - String urlPath = url.getPath(); - - return new ImmutableDmaapPublisherConfiguration.Builder() // - .endpointUrl(publishUrl()) // - .dmaapContentType("application/octet-stream") // - .dmaapPortNumber(url.getPort()) // - .dmaapHostName(url.getHost()) // - .dmaapTopicName(urlPath) // - .dmaapProtocol(url.getProtocol()) // - .dmaapUserName(this.userName()) // - .dmaapUserPassword(this.passWord()) // - .trustStorePath(this.trustStorePath()) // - .trustStorePasswordPath(this.trustStorePasswordPath()) // - .keyStorePath(this.keyStorePath()) // - .keyStorePasswordPath(this.keyStorePasswordPath()) // - .enableDmaapCertAuth(this.enableDmaapCertAuth()) // - .build(); - } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java deleted file mode 100644 index 5a8806b4..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java +++ /dev/null @@ -1,91 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.service; - -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import org.springframework.http.HttpHeaders; -import org.springframework.web.reactive.function.client.ClientRequest; -import org.springframework.web.reactive.function.client.ClientResponse; -import org.springframework.web.reactive.function.client.ExchangeFilterFunction; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClient.Builder; -import reactor.core.publisher.Mono; - -/** - * Web client for the DMaaP MessageRouter. - * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 - */ -public class DmaapWebClient { - - private static final Logger logger = LoggerFactory.getLogger(DmaapWebClient.class); - - private String contentType; - - /** - * Creating DmaapReactiveWebClient passing to them basic DmaapConfig. - * - * @param dmaapCustomConfig - configuration object - * @return DmaapReactiveWebClient - */ - public DmaapWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { - this.contentType = dmaapCustomConfig.dmaapContentType(); - return this; - } - - /** - * Construct Reactive WebClient with appropriate settings. - * - * @return WebClient - */ - public WebClient build() { - Builder webClientBuilder = WebClient.builder() // - .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) // - .filter(getRequestFilter()) // - .filter(getResponseFilter()); - return webClientBuilder.build(); - } - - private ExchangeFilterFunction getResponseFilter() { - return ExchangeFilterFunction.ofResponseProcessor(this::logResponse); - } - - Mono<ClientResponse> logResponse(ClientResponse clientResponse) { - MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode())); - logger.trace("Response Status {}", clientResponse.statusCode()); - MDC.remove(RESPONSE_CODE); - return Mono.just(clientResponse); - } - - private ExchangeFilterFunction getRequestFilter() { - return ExchangeFilterFunction.ofRequestProcessor(this::logRequest); - } - - Mono<ClientRequest> logRequest(ClientRequest clientRequest) { - MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url())); - logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url()); - logger.trace("HTTP request headers: {}", clientRequest.headers()); - MDC.remove(SERVICE_NAME); - return Mono.just(clientRequest); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index eed0f0bd..708865fa 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,18 +96,17 @@ public class JsonMessageParser { * @return a <code>Flux</code> containing messages. */ - public Flux<FileReadyMessage> getMessagesFromJson(Mono<JsonElement> rawMessage) { - return rawMessage.flatMapMany(this::createMessageData); + public Flux<FileReadyMessage> getMessagesFromJson(Flux<JsonElement> rawMessage) { + return rawMessage.flatMap(this::createMessageData); } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); if (element.isJsonPrimitive()) { - return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()); + return Optional.of(JsonParser.parseString(element.getAsString()).getAsJsonObject()); } else if (element.isJsonObject()) { return Optional.of((JsonObject) element); } else { - return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + return Optional.of(JsonParser.parseString(element.toString()).getAsJsonObject()); } } @@ -117,9 +116,9 @@ public class JsonMessageParser { } /** - * Extract info from string and create a Flux of {@link FileReadyMessage}. + * Extract info from jsonElement and create a Flux of {@link FileReadyMessage}. * - * @param rawMessage - results from DMaaP + * @param jsonElement - result from DMaaP * @return reactive Flux of FileReadyMessages */ private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java index a46e17ba..9de37e8c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,10 +37,10 @@ import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.ssl.SSLContextBuilder; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper; import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -61,14 +61,14 @@ public class DmaapProducerHttpClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final DmaapPublisherConfiguration configuration; + private final PublisherConfiguration configuration; /** * Constructor DmaapProducerReactiveHttpClient. * * @param dmaapPublisherConfiguration - DMaaP producer configuration object */ - public DmaapProducerHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { + public DmaapProducerHttpClient(PublisherConfiguration dmaapPublisherConfiguration) { this.configuration = dmaapPublisherConfiguration; } @@ -131,7 +131,7 @@ public class DmaapProducerHttpClient { * @param request the request to add credentials to. */ public void addUserCredentialsToHead(HttpUriRequest request) { - String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword(); + String plainCreds = configuration.userName() + ":" + configuration.password(); byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes); String base64Creds = new String(base64CredsBytes); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index 066983ae..0780e18e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,21 +22,14 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; - -import java.util.Optional; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * Component used to get messages from the MessageRouter. @@ -46,18 +40,14 @@ public class DMaaPMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class); private final AppConfig datafileAppConfig; private final JsonMessageParser jsonMessageParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; public DMaaPMessageConsumer(AppConfig datafileAppConfig) { - this(datafileAppConfig, new JsonMessageParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); + this(datafileAppConfig, new JsonMessageParser()); } - protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser, - ConsumerReactiveHttpClientFactory httpClientFactory) { + protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser) { this.datafileAppConfig = datafileAppConfig; this.jsonMessageParser = jsonMessageParser; - this.httpClientFactory = httpClientFactory; } /** @@ -68,21 +58,20 @@ public class DMaaPMessageConsumer { public Flux<FileReadyMessage> getMessageRouterResponse() { logger.trace("getMessageRouterResponse called"); try { - DMaaPConsumerReactiveHttpClient client = createHttpClient(); - return consume((client.getDMaaPConsumerResponse(Optional.empty()))); - } catch (DatafileTaskException e) { + ConsumerConfiguration dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration(); + MessageRouterSubscriber messageRouterSubscriber = + dmaapConsumerConfiguration.getMessageRouterSubscriber(); + Flux<JsonElement> responseElements = + messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()); + return consume(responseElements); + } catch (Exception e) { logger.warn("Unable to get response from message router", e); return Flux.empty(); } } - private Flux<FileReadyMessage> consume(Mono<JsonElement> message) { - logger.trace("consume called with arg {}", message); - return jsonMessageParser.getMessagesFromJson(message); - } - - public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException { - return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap()); + private Flux<FileReadyMessage> consume(Flux<JsonElement> messages) { + return jsonMessageParser.getMessagesFromJson(messages); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index cfaf1753..8b86440a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +25,6 @@ import com.google.gson.JsonElement; import com.google.gson.JsonParser; import java.io.File; -import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; @@ -42,7 +42,6 @@ import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -113,7 +112,7 @@ public class DataRouterPublisher { private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException { put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); - JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); + JsonElement metaData = JsonParser.parseString(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); put.addHeader(X_DMAAP_DR_META, metaData.toString()); URI uri = new DefaultUriBuilderFactory( datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) // @@ -155,12 +154,8 @@ public class DataRouterPublisher { } DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException { - try { - DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap(); - return new DmaapProducerHttpClient(cfg); - } catch (MalformedURLException e) { - throw new DatafileTaskException("Cannot resolve producer client", e); - } + PublisherConfiguration publisherConfiguration = resolveConfiguration(changeIdentifier); + return new DmaapProducerHttpClient(publisherConfiguration); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index 037803bd..a9973cf4 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. +* Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +22,6 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.io.InputStream; -import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; @@ -113,12 +113,7 @@ public class PublishedChecker { return appConfig.getPublisherConfiguration(changeIdentifier); } - protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) - throws DatafileTaskException { - try { - return new DmaapProducerHttpClient(publisherConfig.toDmaap()); - } catch (MalformedURLException e) { - throw new DatafileTaskException("Cannot create published checker client", e); - } + protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) { + return new DmaapProducerHttpClient(publisherConfig); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 42a6fea3..eba0a6cb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -302,8 +302,7 @@ public class ScheduledTasks { private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) { MDC.setContextMap(context); - logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), - this.applicationConfiguration.getDmaapConsumerConfiguration()); + logger.error("Polling for file ready message failed, exception: {}", exception.toString()); return Flux.empty(); } |