diff options
author | Remigiusz Janeczek <remigiusz.janeczek@nokia.com> | 2020-08-18 12:48:19 +0200 |
---|---|---|
committer | Remigiusz Janeczek <remigiusz.janeczek@nokia.com> | 2020-08-25 16:24:17 +0200 |
commit | 3b6c2739505c097f3ea32475ebd3db41bbaae7ef (patch) | |
tree | c68d7182520ff7bd48dd046cbc03373d38235cc4 /datafile-app-server | |
parent | 42a2932087a43b05cdcd2247108246b3ccdb0b2b (diff) |
Update dcae SDK from 1.1.6 to 1.4.2
Bump project version from 1.4.2 to 1.4.3
Update deprecated calls to JsonParser
Make logs always go to file and console
Issue-ID: DCAEGEN2-2267
Signed-off-by: Remigiusz Janeczek <remigiusz.janeczek@nokia.com>
Change-Id: Ib8d7f82b3daf03ca327581c9a5dc4f6f27a20141
Diffstat (limited to 'datafile-app-server')
28 files changed, 368 insertions, 878 deletions
diff --git a/datafile-app-server/config/application.yaml b/datafile-app-server/config/application.yaml index 69771e20..d3cced84 100644 --- a/datafile-app-server/config/application.yaml +++ b/datafile-app-server/config/application.yaml @@ -21,6 +21,7 @@ logging: org.springframework.data: ERROR org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR org.onap.dcaegen2.collectors.datafile: WARN + org.onap.dcaegen2: WARN file: /var/log/ONAP/application.log app: filepath: config/datafile_endpoints_test.json diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml index b06ed824..896f5d25 100644 --- a/datafile-app-server/pom.xml +++ b/datafile-app-server/pom.xml @@ -1,7 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- ~ ============LICENSE_START======================================================= - ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018-2020 Nordix Foundation. All rights reserved. + ~ Copyright (C) 2018-2020 NOKIA Intellectual Property, 2018-2020 Nordix Foundation. All rights reserved. ~ Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. ~ ================================================================================ ~ Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,7 +25,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors</groupId> <artifactId>datafile</artifactId> - <version>1.4.2-SNAPSHOT</version> + <version>1.4.3-SNAPSHOT</version> </parent> <groupId>org.onap.dcaegen2.collectors.datafile</groupId> @@ -47,6 +47,10 @@ <artifactId>dmaap-client</artifactId> </dependency> <dependency> + <groupId>org.onap.dcaegen2.services.sdk.security</groupId> + <artifactId>ssl</artifactId> + </dependency> + <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpasyncclient</artifactId> </dependency> @@ -71,6 +75,10 @@ <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> + <dependency> + <groupId>org.immutables</groupId> + <artifactId>value</artifactId> + </dependency> <!-- Actuator dependencies --> <dependency> diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java index c257ceed..d933e337 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -41,8 +41,9 @@ import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticConte import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.CbsClientConfigurationException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,12 +72,12 @@ public class AppConfig { private static final Logger logger = LoggerFactory.getLogger(AppConfig.class); + @Value("#{systemEnvironment}") + Properties systemEnvironment; private ConsumerConfiguration dmaapConsumerConfiguration; private Map<String, PublisherConfiguration> publishingConfigurations; private FtpesConfig ftpesConfiguration; private SftpConfig sftpConfiguration; - @Value("#{systemEnvironment}") - Properties systemEnvironment; private Disposable refreshConfigTask = null; @NotEmpty @@ -102,8 +103,8 @@ public class AppConfig { } Flux<AppConfig> createRefreshTask(Map<String, String> context) { - return getEnvironment(systemEnvironment, context) // - .flatMap(this::createCbsClient) // + return createCbsClientConfiguration() + .flatMap(this::createCbsClient) .flatMapMany(this::periodicConfigurationUpdates) // .map(this::parseCloudConfig) // .onErrorResume(this::onErrorResume); @@ -175,19 +176,25 @@ public class AppConfig { return Mono.empty(); } - Mono<EnvProperties> getEnvironment(Properties systemEnvironment, Map<String, String> context) { - return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment, context); + Mono<CbsClientConfiguration> createCbsClientConfiguration() { + try { + return Mono.just(CbsClientConfiguration.fromEnvironment()); + } catch (CbsClientConfigurationException e) { + return Mono.error(e); + } } - Mono<CbsClient> createCbsClient(EnvProperties env) { - return CbsClientFactory.createCbsClient(env); + Mono<CbsClient> createCbsClient(CbsClientConfiguration cbsClientConfiguration) { + return CbsClientFactory.createCbsClient(cbsClientConfiguration); } private AppConfig parseCloudConfig(JsonObject configurationObject) { try { - CloudConfigParser parser = new CloudConfigParser(configurationObject, systemEnvironment); - setConfiguration(parser.getDmaapConsumerConfig(), parser.getDmaapPublisherConfigurations(), - parser.getFtpesConfig(), parser.getSftpConfig()); + CloudConfigParser parser = + new CloudConfigParser(configurationObject, systemEnvironment); + setConfiguration(parser.getConsumerConfiguration(), + parser.getDmaapPublisherConfigurations(), parser.getFtpesConfig(), + parser.getSftpConfig()); logConfig(); } catch (DatafileTaskException e) { logger.error("Could not parse configuration {}", e.toString(), e); @@ -207,8 +214,7 @@ public class AppConfig { ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); try (InputStream inputStream = createInputStream(filepath)) { - JsonParser parser = new JsonParser(); - JsonObject rootObject = getJsonElement(parser, inputStream).getAsJsonObject(); + JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject(); if (rootObject == null) { throw new JsonSyntaxException("Root is not a json object"); } @@ -228,8 +234,8 @@ public class AppConfig { this.sftpConfiguration = sftpConfig; } - JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { - return parser.parse(new InputStreamReader(inputStream)); + JsonElement getJsonElement(InputStream inputStream) { + return JsonParser.parseReader(new InputStreamReader(inputStream)); } InputStream createInputStream(@NotNull String filepath) throws IOException { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java index a86a32b8..6ace4aae 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,16 +21,32 @@ package org.onap.dcaegen2.collectors.datafile.configuration; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; -import java.util.Set; import javax.validation.constraints.NotNull; +import io.vavr.collection.Stream; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore; +import org.onap.dcaegen2.services.sdk.security.ssl.Passwords; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; /** * Parses the cloud configuration. @@ -51,6 +67,12 @@ public class CloudConfigParser { private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING = "sftp.security.strictHostKeyChecking"; + private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup"; + private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId"; + private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs"; + private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1; + private static final int FIRST_SOURCE_INDEX = 0; + private final Properties systemEnvironment; private final JsonObject jsonObject; @@ -78,7 +100,7 @@ public class CloudConfigParser { PublisherConfiguration cfg = ImmutablePublisherConfiguration.builder() // .publishUrl(getAsString(feedConfig, "publish_url")) // - .passWord(getAsString(feedConfig, "password")) // + .password(getAsString(feedConfig, "password")) // .userName(getAsString(feedConfig, "username")) // .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) // .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) // @@ -98,24 +120,51 @@ public class CloudConfigParser { * Get the consumer configuration. * * @return the consumer configuration. - * @throws DatafileTaskException if a member of the configuration is missing. + * @throws DatafileTaskException if the configuration is invalid. */ - public @NotNull ConsumerConfiguration getDmaapConsumerConfig() throws DatafileTaskException { - JsonObject consumerCfg = jsonObject.get("streams_subscribes").getAsJsonObject(); - Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet(); - if (topics.size() != 1) { - throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + topics); + public @NotNull ConsumerConfiguration getConsumerConfiguration() throws DatafileTaskException { + try { + MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig(); + MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest(); + MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig); + return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest); + } catch (Exception e) { + throw new DatafileTaskException("Could not parse message router consumer configuration", e); } - JsonObject topic = topics.iterator().next().getValue().getAsJsonObject(); - JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject(); - String topicUrl = getAsString(dmaapInfo, "topic_url"); - - return ImmutableConsumerConfiguration.builder().topicUrl(topicUrl) - .trustStorePath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH)) - .trustStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH)) - .keyStorePath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH)) - .keyStorePasswordPath(getAsString(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH)) - .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) // + } + + private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException { + return ImmutableMessageRouterSubscriberConfig.builder() + .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null) + .build(); + } + + private SecurityKeys createSecurityKeys() throws DatafileTaskException { + return ImmutableSecurityKeys.builder() + .keyStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH))) + .keyStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH))) + .trustStore(ImmutableSecurityKeysStore.of(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH))) + .trustStorePassword(Passwords.fromPath(getAsPath(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH))) + .build(); + } + + private boolean isDmaapCertAuthEnabled(JsonObject config) { + return config.get(DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean(); + } + + private MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() throws DatafileTaskException { + Stream<RawDataStream<JsonObject>> sources = DataStreams.namedSources(jsonObject); + if (sources.size() != EXPECTED_NUMBER_OF_SOURCE_TOPICS) { + throw new DatafileTaskException("Invalid configuration, number of topic must be one, config: " + sources); + } + RawDataStream<JsonObject> source = sources.get(FIRST_SOURCE_INDEX); + MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source); + + return ImmutableMessageRouterSubscribeRequest.builder() + .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP)) + .sourceDefinition(parsedSource) + .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID)) + .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong())) .build(); } @@ -176,4 +225,8 @@ public class CloudConfigParser { return get(obj, memberName).getAsJsonObject(); } + private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException { + return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath)); + } + } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java index 4db7963d..89fcf1c2 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfiguration.java @@ -1,113 +1,56 @@ /*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at + * ============LICENSE_START======================================================= + * Copyright (C) 2020 NOKIA Intellectual Property. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= */ package org.onap.dcaegen2.collectors.datafile.configuration; -import java.net.MalformedURLException; -import java.net.URL; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; -import org.immutables.gson.Gson; -import org.immutables.value.Value; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; +public class ConsumerConfiguration { -@Value.Immutable -@Value.Style(redactedMask = "####") -@Gson.TypeAdapters -public abstract class ConsumerConfiguration { - @Value.Redacted - public abstract String topicUrl(); + private final MessageRouterSubscriberConfig messageRouterSubscriberConfig; + private final MessageRouterSubscriber messageRouterSubscriber; + private final MessageRouterSubscribeRequest messageRouterSubscribeRequest; - public abstract String trustStorePath(); - - public abstract String trustStorePasswordPath(); - - public abstract String keyStorePath(); - - public abstract String keyStorePasswordPath(); - - public abstract Boolean enableDmaapCertAuth(); - - /** - * Gets the configuration in the SDK version. - * - * @return a <code>DmaapConsumerConfiguration</code> representing the configuration. - * - * @throws DatafileTaskException if something is wrong with the topic URL. - */ - public DmaapConsumerConfiguration toDmaap() throws DatafileTaskException { - try { - URL url = new URL(topicUrl()); - String passwd = ""; - String userName = ""; - if (url.getUserInfo() != null) { - String[] userInfo = url.getUserInfo().split(":"); - userName = userInfo[0]; - passwd = userInfo[1]; - } - String urlPath = url.getPath(); - DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath); - - return new ImmutableDmaapConsumerConfiguration.Builder() // - .endpointUrl(topicUrl()) // - .dmaapContentType("application/json") // - .dmaapPortNumber(url.getPort()) // - .dmaapHostName(url.getHost()) // - .dmaapTopicName(path.dmaapTopicName) // - .dmaapProtocol(url.getProtocol()) // - .dmaapUserName(userName) // - .dmaapUserPassword(passwd) // - .trustStorePath(this.trustStorePath()) // - .trustStorePasswordPath(this.trustStorePasswordPath()) // - .keyStorePath(this.keyStorePath()) // - .keyStorePasswordPath(this.keyStorePasswordPath()) // - .enableDmaapCertAuth(this.enableDmaapCertAuth()) // - .consumerId(path.consumerId) // - .consumerGroup(path.consumerGroup) // - .timeoutMs(-1) // - .messageLimit(-1) // - .build(); - } catch (MalformedURLException e) { - throw new DatafileTaskException("Could not parse the URL", e); - } + public ConsumerConfiguration(MessageRouterSubscriberConfig messageRouterSubscriberConfig, + MessageRouterSubscriber messageRouterSubscriber, MessageRouterSubscribeRequest messageRouterSubscribeRequest) { + this.messageRouterSubscriberConfig = messageRouterSubscriberConfig; + this.messageRouterSubscriber = messageRouterSubscriber; + this.messageRouterSubscribeRequest = messageRouterSubscribeRequest; } - private class DmaapConsumerUrlPath { - final String dmaapTopicName; - final String consumerGroup; - final String consumerId; - - DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { - this.dmaapTopicName = dmaapTopicName; - this.consumerGroup = consumerGroup; - this.consumerId = consumerId; - } + public MessageRouterSubscriber getMessageRouterSubscriber() { + return messageRouterSubscriber; } - private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws DatafileTaskException { - String[] tokens = urlPath.split("/"); // /events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12 - if (tokens.length != 5) { - throw new DatafileTaskException("The path has incorrect syntax: " + urlPath); - } + public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() { + return messageRouterSubscribeRequest; + } - final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/unauthenticated.VES_NOTIFICATION_OUTPUT - final String consumerGroup = tokens[3]; // OpenDcae-c12 - final String consumerId = tokens[4]; // C12 - return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId); + public MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() { + return messageRouterSubscriberConfig; } + @Override + public String toString() { + return "ConsumerConfiguration{" + "securityKeys=" + messageRouterSubscriberConfig.securityKeys() + + ", consumerGroup=" + messageRouterSubscribeRequest.consumerGroup() + ", consumerID=" + + messageRouterSubscribeRequest.consumerId() + '}'; + } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java deleted file mode 100644 index ad5f648d..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java +++ /dev/null @@ -1,91 +0,0 @@ -/*- - * ============LICENSE_START======================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * ================================================================================================= - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END========================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - -import java.util.Map; -import java.util.Optional; -import java.util.Properties; - -import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import reactor.core.publisher.Mono; - -/** - * Handling the Consul connection. - * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 - */ -class EnvironmentProcessor { - - private static final int DEFAULT_CONSUL_PORT = 8500; - private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); - - private EnvironmentProcessor() { - } - - static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment, Map<String, String> contextMap) { - MDC.setContextMap(contextMap); - logger.trace("Loading configuration from system environment variables"); - EnvProperties envProperties; - try { - envProperties = ImmutableEnvProperties.builder() // - .consulHost(getConsulHost(systemEnvironment)) // - .consulPort(getConsultPort(systemEnvironment)) // - .cbsName(getConfigBindingService(systemEnvironment)) // - .appName(getService(systemEnvironment)) // - .build(); - } catch (EnvironmentLoaderException e) { - return Mono.error(e); - } - logger.trace("Evaluated environment system variables {}", envProperties); - return Mono.just(envProperties); - } - - private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST")) - .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined")); - } - - private static Integer getConsultPort(Properties systemEnvironments) { - return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) // - .map(Integer::valueOf) // - .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); - } - - private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) // - .orElseThrow( - () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined")); - } - - private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional - .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME")) - .orElse(systemEnvironments.getProperty("SERVICE_NAME"))) - .orElseThrow(() -> new EnvironmentLoaderException( - "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment")); - } - - private static Integer getDefaultPortOfConsul() { - logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); - return DEFAULT_CONSUL_PORT; - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java index d7451bdb..fa7d7841 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START====================================================================== * Copyright (C) 2018,2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2020 Nokia. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,13 +17,10 @@ package org.onap.dcaegen2.collectors.datafile.configuration; -import java.net.MalformedURLException; -import java.net.URL; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; + @Value.Immutable @Value.Style(redactedMask = "####") @@ -36,7 +34,7 @@ public interface PublisherConfiguration { String userName(); @Value.Redacted - String passWord(); + String password(); String trustStorePath(); @@ -50,30 +48,4 @@ public interface PublisherConfiguration { String changeIdentifier(); - /** - * Get the publisher configuration in SDK format. - * - * @return a <code>DmaapPublisherConfiguration</code> contining the publisher configuration. - * @throws MalformedURLException if the publish URL is malformed. - */ - default DmaapPublisherConfiguration toDmaap() throws MalformedURLException { - URL url = new URL(publishUrl()); - String urlPath = url.getPath(); - - return new ImmutableDmaapPublisherConfiguration.Builder() // - .endpointUrl(publishUrl()) // - .dmaapContentType("application/octet-stream") // - .dmaapPortNumber(url.getPort()) // - .dmaapHostName(url.getHost()) // - .dmaapTopicName(urlPath) // - .dmaapProtocol(url.getProtocol()) // - .dmaapUserName(this.userName()) // - .dmaapUserPassword(this.passWord()) // - .trustStorePath(this.trustStorePath()) // - .trustStorePasswordPath(this.trustStorePasswordPath()) // - .keyStorePath(this.keyStorePath()) // - .keyStorePasswordPath(this.keyStorePasswordPath()) // - .enableDmaapCertAuth(this.enableDmaapCertAuth()) // - .build(); - } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java deleted file mode 100644 index 5a8806b4..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClient.java +++ /dev/null @@ -1,91 +0,0 @@ -/*- - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.service; - -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; -import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME; - -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import org.springframework.http.HttpHeaders; -import org.springframework.web.reactive.function.client.ClientRequest; -import org.springframework.web.reactive.function.client.ClientResponse; -import org.springframework.web.reactive.function.client.ExchangeFilterFunction; -import org.springframework.web.reactive.function.client.WebClient; -import org.springframework.web.reactive.function.client.WebClient.Builder; -import reactor.core.publisher.Mono; - -/** - * Web client for the DMaaP MessageRouter. - * - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18 - */ -public class DmaapWebClient { - - private static final Logger logger = LoggerFactory.getLogger(DmaapWebClient.class); - - private String contentType; - - /** - * Creating DmaapReactiveWebClient passing to them basic DmaapConfig. - * - * @param dmaapCustomConfig - configuration object - * @return DmaapReactiveWebClient - */ - public DmaapWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) { - this.contentType = dmaapCustomConfig.dmaapContentType(); - return this; - } - - /** - * Construct Reactive WebClient with appropriate settings. - * - * @return WebClient - */ - public WebClient build() { - Builder webClientBuilder = WebClient.builder() // - .defaultHeader(HttpHeaders.CONTENT_TYPE, contentType) // - .filter(getRequestFilter()) // - .filter(getResponseFilter()); - return webClientBuilder.build(); - } - - private ExchangeFilterFunction getResponseFilter() { - return ExchangeFilterFunction.ofResponseProcessor(this::logResponse); - } - - Mono<ClientResponse> logResponse(ClientResponse clientResponse) { - MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode())); - logger.trace("Response Status {}", clientResponse.statusCode()); - MDC.remove(RESPONSE_CODE); - return Mono.just(clientResponse); - } - - private ExchangeFilterFunction getRequestFilter() { - return ExchangeFilterFunction.ofRequestProcessor(this::logRequest); - } - - Mono<ClientRequest> logRequest(ClientRequest clientRequest) { - MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url())); - logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url()); - logger.trace("HTTP request headers: {}", clientRequest.headers()); - MDC.remove(SERVICE_NAME); - return Mono.just(clientRequest); - } -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index eed0f0bd..708865fa 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,18 +96,17 @@ public class JsonMessageParser { * @return a <code>Flux</code> containing messages. */ - public Flux<FileReadyMessage> getMessagesFromJson(Mono<JsonElement> rawMessage) { - return rawMessage.flatMapMany(this::createMessageData); + public Flux<FileReadyMessage> getMessagesFromJson(Flux<JsonElement> rawMessage) { + return rawMessage.flatMap(this::createMessageData); } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); if (element.isJsonPrimitive()) { - return Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()); + return Optional.of(JsonParser.parseString(element.getAsString()).getAsJsonObject()); } else if (element.isJsonObject()) { return Optional.of((JsonObject) element); } else { - return Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); + return Optional.of(JsonParser.parseString(element.toString()).getAsJsonObject()); } } @@ -117,9 +116,9 @@ public class JsonMessageParser { } /** - * Extract info from string and create a Flux of {@link FileReadyMessage}. + * Extract info from jsonElement and create a Flux of {@link FileReadyMessage}. * - * @param rawMessage - results from DMaaP + * @param jsonElement - result from DMaaP * @return reactive Flux of FileReadyMessages */ private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java index a46e17ba..9de37e8c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClient.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START======================================================= - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,10 +37,10 @@ import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.ssl.SSLContextBuilder; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper; import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -61,14 +61,14 @@ public class DmaapProducerHttpClient { private final Logger logger = LoggerFactory.getLogger(this.getClass()); - private final DmaapPublisherConfiguration configuration; + private final PublisherConfiguration configuration; /** * Constructor DmaapProducerReactiveHttpClient. * * @param dmaapPublisherConfiguration - DMaaP producer configuration object */ - public DmaapProducerHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) { + public DmaapProducerHttpClient(PublisherConfiguration dmaapPublisherConfiguration) { this.configuration = dmaapPublisherConfiguration; } @@ -131,7 +131,7 @@ public class DmaapProducerHttpClient { * @param request the request to add credentials to. */ public void addUserCredentialsToHead(HttpUriRequest request) { - String plainCreds = configuration.dmaapUserName() + ":" + configuration.dmaapUserPassword(); + String plainCreds = configuration.userName() + ":" + configuration.password(); byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1); byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes); String base64Creds = new String(base64CredsBytes); diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index 066983ae..0780e18e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,21 +22,14 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; - -import java.util.Optional; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * Component used to get messages from the MessageRouter. @@ -46,18 +40,14 @@ public class DMaaPMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class); private final AppConfig datafileAppConfig; private final JsonMessageParser jsonMessageParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; public DMaaPMessageConsumer(AppConfig datafileAppConfig) { - this(datafileAppConfig, new JsonMessageParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); + this(datafileAppConfig, new JsonMessageParser()); } - protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser, - ConsumerReactiveHttpClientFactory httpClientFactory) { + protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser) { this.datafileAppConfig = datafileAppConfig; this.jsonMessageParser = jsonMessageParser; - this.httpClientFactory = httpClientFactory; } /** @@ -68,21 +58,20 @@ public class DMaaPMessageConsumer { public Flux<FileReadyMessage> getMessageRouterResponse() { logger.trace("getMessageRouterResponse called"); try { - DMaaPConsumerReactiveHttpClient client = createHttpClient(); - return consume((client.getDMaaPConsumerResponse(Optional.empty()))); - } catch (DatafileTaskException e) { + ConsumerConfiguration dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration(); + MessageRouterSubscriber messageRouterSubscriber = + dmaapConsumerConfiguration.getMessageRouterSubscriber(); + Flux<JsonElement> responseElements = + messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()); + return consume(responseElements); + } catch (Exception e) { logger.warn("Unable to get response from message router", e); return Flux.empty(); } } - private Flux<FileReadyMessage> consume(Mono<JsonElement> message) { - logger.trace("consume called with arg {}", message); - return jsonMessageParser.getMessagesFromJson(message); - } - - public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException { - return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap()); + private Flux<FileReadyMessage> consume(Flux<JsonElement> messages) { + return jsonMessageParser.getMessagesFromJson(messages); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index cfaf1753..8b86440a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +25,6 @@ import com.google.gson.JsonElement; import com.google.gson.JsonParser; import java.io.File; -import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; @@ -42,7 +42,6 @@ import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -113,7 +112,7 @@ public class DataRouterPublisher { private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException { put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); - JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); + JsonElement metaData = JsonParser.parseString(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); put.addHeader(X_DMAAP_DR_META, metaData.toString()); URI uri = new DefaultUriBuilderFactory( datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) // @@ -155,12 +154,8 @@ public class DataRouterPublisher { } DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException { - try { - DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap(); - return new DmaapProducerHttpClient(cfg); - } catch (MalformedURLException e) { - throw new DatafileTaskException("Cannot resolve producer client", e); - } + PublisherConfiguration publisherConfiguration = resolveConfiguration(changeIdentifier); + return new DmaapProducerHttpClient(publisherConfiguration); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index 037803bd..a9973cf4 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. +* Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +22,6 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.io.InputStream; -import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; @@ -113,12 +113,7 @@ public class PublishedChecker { return appConfig.getPublisherConfiguration(changeIdentifier); } - protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) - throws DatafileTaskException { - try { - return new DmaapProducerHttpClient(publisherConfig.toDmaap()); - } catch (MalformedURLException e) { - throw new DatafileTaskException("Cannot create published checker client", e); - } + protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) { + return new DmaapProducerHttpClient(publisherConfig); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 42a6fea3..eba0a6cb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -302,8 +302,7 @@ public class ScheduledTasks { private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) { MDC.setContextMap(context); - logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), - this.applicationConfiguration.getDmaapConsumerConfiguration()); + logger.error("Polling for file ready message failed, exception: {}", exception.toString()); return Flux.empty(); } diff --git a/datafile-app-server/src/main/resources/logback-spring.xml b/datafile-app-server/src/main/resources/logback-spring.xml index 1b9818d5..89405f2f 100644 --- a/datafile-app-server/src/main/resources/logback-spring.xml +++ b/datafile-app-server/src/main/resources/logback-spring.xml @@ -16,49 +16,28 @@ |%thread |%n"/> - <springProfile name="dev"> - <appender name="CONSOLE" target="SYSTEM_OUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>${defaultPattern}</pattern> - </encoder> - </appender> + <appender name="CONSOLE" target="SYSTEM_OUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${defaultPattern}</pattern> + </encoder> + </appender> - <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${defaultPattern}</pattern> - </encoder> - <file>${logPath}/${outputFilename}.log</file> - <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> - <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.log</fileNamePattern> - <MaxFileSize>${maxFileSize}</MaxFileSize> - <MaxHistory>${maxHistory}</MaxHistory> - <TotalSizeCap>${totalSizeCap}</TotalSizeCap> - </rollingPolicy> - </appender> - <root level="ERROR"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> - </root> - </springProfile> + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${defaultPattern}</pattern> + </encoder> + <file>${logPath}/${outputFilename}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.log</fileNamePattern> + <MaxFileSize>${maxFileSize}</MaxFileSize> + <MaxHistory>${maxHistory}</MaxHistory> + <TotalSizeCap>${totalSizeCap}</TotalSizeCap> + </rollingPolicy> + </appender> + <root level="ERROR"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> - <springProfile name="prod"> - <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${defaultPattern}</pattern> - </encoder> - <file>${logPath}/${outputFilename}.log</file> - <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> - <fileNamePattern>${outputFilename}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern> - <MaxFileSize>${maxFileSize}</MaxFileSize> - <MaxHistory>${maxHistory}</MaxHistory> - <TotalSizeCap>${totalSizeCap}</TotalSizeCap> - </rollingPolicy> - </appender> - - <root level="ERROR"> - <appender-ref ref="ROLLING-FILE"/> - </root> - </springProfile> -</configuration>
\ No newline at end of file +</configuration> diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java index d0f02d69..dc8a1229 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,20 +16,8 @@ package org.onap.dcaegen2.collectors.datafile.configuration; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; - import com.google.common.base.Charsets; import com.google.common.io.Resources; import com.google.gson.JsonElement; @@ -37,16 +25,6 @@ import com.google.gson.JsonIOException; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Properties; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,15 +32,33 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; - +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * Tests the AppConfig. * @@ -73,50 +69,18 @@ public class AppConfigTest { public static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; - public static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = // - new ImmutableDmaapConsumerConfiguration.Builder() // - .endpointUrl( - "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12") - .timeoutMs(-1) // - .dmaapHostName("localhost") // - .dmaapUserName("dradmin") // - .dmaapUserPassword("dradmin") // - .dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") // - .dmaapPortNumber(2222) // - .dmaapContentType("application/json") // - .messageLimit(-1) // - .dmaapProtocol("http") // - .consumerId("C12") // - .consumerGroup("OpenDcae-c12") // - .trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // - .enableDmaapCertAuth(true) // - .build(); - - public static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() // - .topicUrl( - "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12") - .trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // - .enableDmaapCertAuth(true) // - .build(); - private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = // ImmutablePublisherConfiguration.builder() // .publishUrl("https://localhost:3907/publish/1") // .logUrl("https://localhost:3907/feedlog/1") // - .trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // + .trustStorePath("src/test/resources/trust.jks") // + .trustStorePasswordPath("src/test/resources/trust.pass") // + .keyStorePath("src/test/resources/cert.jks") // + .keyStorePasswordPath("src/test/resources/jks.pass") // .enableDmaapCertAuth(true) // .changeIdentifier("PM_MEAS_FILES") // .userName("CYE9fl40") // - .passWord("izBJD8nLjawq0HMG") // + .password("izBJD8nLjawq0HMG") // .build(); private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = // @@ -127,35 +91,10 @@ public class AppConfigTest { .trustedCaPasswordPath("/src/test/resources/ftp.jks.pass") // .build(); - private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = // - new ImmutableDmaapPublisherConfiguration.Builder() // - .endpointUrl("https://localhost:3907/publish/1") // - .dmaapTopicName("/publish/1") // - .dmaapUserPassword("izBJD8nLjawq0HMG") // - .dmaapPortNumber(3907) // - .dmaapProtocol("https") // - .dmaapContentType("application/octet-stream") // - .dmaapHostName("localhost") // - .dmaapUserName("CYE9fl40") // - .trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // - .enableDmaapCertAuth(true) // - .build(); - - private static EnvProperties properties() { - return ImmutableEnvProperties.builder() // - .consulHost("host") // - .consulPort(123) // - .cbsName("cbsName") // - .appName("appName") // - .build(); - } - private AppConfig appConfigUnderTest; private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext(); CbsClient cbsClient = mock(CbsClient.class); + CbsClientConfiguration cbsClientConfiguration = mock(CbsClientConfiguration.class); @BeforeEach void setUp() { @@ -175,13 +114,11 @@ public class AppConfigTest { ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration(); Assertions.assertNotNull(consumerCfg); - assertThat(consumerCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG); - assertThat(consumerCfg).isEqualToComparingFieldByField(CORRECT_CONSUMER_CONFIG); + assertThat(consumerCfg).satisfies(this::checkCorrectConsumerConfiguration); PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER); Assertions.assertNotNull(publisherCfg); assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG); - assertThat(publisherCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG); FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration(); assertThat(ftpesConfig).isNotNull(); @@ -245,7 +182,7 @@ public class AppConfigTest { doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any()); JsonElement jsonElement = mock(JsonElement.class); when(jsonElement.isJsonObject()).thenReturn(false); - doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class)); + doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(InputStream.class)); appConfigUnderTest.loadConfigurationFromFile(); // Then @@ -266,15 +203,13 @@ public class AppConfigTest { .expectSubscription() // .verifyComplete(); // - assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined")); + assertTrue(logAppender.list.toString().contains("CbsClientConfigurationException")); } @Test public void whenPeriodicConfigRefreshNoConsul() { - EnvProperties props = properties(); - doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any()); - - doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props); + doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration(); + doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration); Flux<JsonObject> err = Flux.error(new IOException()); doReturn(err).when(cbsClient).updates(any(), any(), any()); @@ -292,9 +227,8 @@ public class AppConfigTest { @Test public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException { - EnvProperties props = properties(); - doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any()); - doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props); + doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration(); + doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration); Flux<JsonObject> json = Flux.just(getJsonRootObject()); doReturn(json).when(cbsClient).updates(any(), any(), any()); @@ -312,10 +246,8 @@ public class AppConfigTest { @Test public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException { - EnvProperties props = properties(); - doReturn(Mono.just(props)).when(appConfigUnderTest).getEnvironment(any(), any()); - - doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(props); + doReturn(Mono.just(cbsClientConfiguration)).when(appConfigUnderTest).createCbsClientConfiguration(); + doReturn(Mono.just(cbsClient)).when(appConfigUnderTest).createCbsClient(cbsClientConfiguration); Flux<JsonObject> json = Flux.just(getJsonRootObject()); Flux<JsonObject> err = Flux.error(new IOException()); // no config entry created by the @@ -334,8 +266,21 @@ public class AppConfigTest { Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration()); } + private void checkCorrectConsumerConfiguration(ConsumerConfiguration consumerConfiguration) { + MessageRouterSubscribeRequest messageRouterSubscribeRequest = + consumerConfiguration.getMessageRouterSubscribeRequest(); + assertThat(messageRouterSubscribeRequest.consumerGroup()).isEqualTo("OpenDcae-c12"); + assertThat(messageRouterSubscribeRequest.consumerId()).isEqualTo("C12"); + assertThat(messageRouterSubscribeRequest.sourceDefinition().topicUrl()) + .isEqualTo("http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT"); + SecurityKeys securityKeys = consumerConfiguration.getMessageRouterSubscriberConfig().securityKeys(); + assertThat(securityKeys.keyStore().path().toString()).isEqualTo("src/test/resources/cert.jks"); + assertThat(securityKeys.trustStore().path().toString()).isEqualTo("src/test/resources/trust.jks"); + assertThat(consumerConfiguration.getMessageRouterSubscriber()).isNotNull(); + } + private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException { - JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject(); + JsonObject rootObject = JsonParser.parseReader(new InputStreamReader(getCorrectJson())).getAsJsonObject(); return rootObject; } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java deleted file mode 100644 index bdeb1c1e..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/ConsumerConfigurationTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2019 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.configuration; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; - -public class ConsumerConfigurationTest { - @Test - public void toDmaapSuccess() throws DatafileTaskException { - ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() // - .topicUrl( - "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12") - .trustStorePath("") // - .trustStorePasswordPath("") // - .keyStorePath("") // - .keyStorePasswordPath("") // - .enableDmaapCertAuth(Boolean.FALSE) // - .build(); - - DmaapConsumerConfiguration dmaapConsumerConfiguration = configurationUnderTest.toDmaap(); - assertEquals("http", dmaapConsumerConfiguration.dmaapProtocol()); - assertEquals("message-router.onap.svc.cluster.local", dmaapConsumerConfiguration.dmaapHostName()); - assertEquals(Integer.valueOf("2222"), dmaapConsumerConfiguration.dmaapPortNumber()); - assertEquals("OpenDcae-c12", dmaapConsumerConfiguration.consumerGroup()); - assertEquals("C12", dmaapConsumerConfiguration.consumerId()); - } - - @Test - public void toDmaapNoUserInfoSuccess() throws DatafileTaskException { - ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() // - .topicUrl( - "http://message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12") - .trustStorePath("") // - .trustStorePasswordPath("") // - .keyStorePath("") // - .keyStorePasswordPath("") // - .enableDmaapCertAuth(Boolean.FALSE) // - .build(); - - DmaapConsumerConfiguration dmaapConsumerConfiguration = configurationUnderTest.toDmaap(); - assertEquals("http", dmaapConsumerConfiguration.dmaapProtocol()); - assertEquals("message-router.onap.svc.cluster.local", dmaapConsumerConfiguration.dmaapHostName()); - assertEquals(Integer.valueOf("2222"), dmaapConsumerConfiguration.dmaapPortNumber()); - assertEquals("OpenDcae-c12", dmaapConsumerConfiguration.consumerGroup()); - assertEquals("C12", dmaapConsumerConfiguration.consumerId()); - } - - @Test - public void toDmaapWhenInvalidUrlThrowException() throws DatafileTaskException { - ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() // - .topicUrl("//admin:admin@message-router.onap.svc.cluster.local:2222//events/").trustStorePath("") // - .trustStorePasswordPath("") // - .keyStorePath("") // - .keyStorePasswordPath("") // - .enableDmaapCertAuth(Boolean.FALSE) // - .build(); - - DatafileTaskException exception = - assertThrows(DatafileTaskException.class, () -> configurationUnderTest.toDmaap()); - assertEquals("Could not parse the URL", exception.getMessage()); - } - - @Test - public void toDmaapWhenInvalidPathThrowException() throws DatafileTaskException { - ConsumerConfiguration configurationUnderTest = ImmutableConsumerConfiguration.builder() // - .topicUrl("http://admin:admin@message-router.onap.svc.cluster.local:2222//events/") // - .trustStorePath("") // - .trustStorePasswordPath("") // - .keyStorePath("") // - .keyStorePasswordPath("") // - .enableDmaapCertAuth(Boolean.FALSE) // - .build(); - - DatafileTaskException exception = - assertThrows(DatafileTaskException.class, () -> configurationUnderTest.toDmaap()); - assertEquals("The path has incorrect syntax: //events/", exception.getMessage()); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java deleted file mode 100644 index d4e060ff..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapWebClientTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.service; - -import static org.junit.Assert.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import ch.qos.logback.classic.Level; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.read.ListAppender; - -import java.net.URI; -import java.net.URISyntaxException; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.HttpStatus; -import org.springframework.web.reactive.function.client.ClientRequest; -import org.springframework.web.reactive.function.client.ClientResponse; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; - -@ExtendWith(MockitoExtension.class) -class DmaapWebClientTest { - - @Mock - private DmaapConsumerConfiguration dmaapConsumerConfigurationMock; - - @Mock - private ClientResponse clientResponseMock; - - @Mock - private ClientRequest clientRequesteMock; - - @Test - void buildsDMaaPReactiveWebClientProperly() { - when(dmaapConsumerConfigurationMock.dmaapContentType()).thenReturn("*/*"); - WebClient dmaapWebClientUndetTest = new DmaapWebClient() // - .fromConfiguration(dmaapConsumerConfigurationMock) // - .build(); - - verify(dmaapConsumerConfigurationMock, times(1)).dmaapContentType(); - assertNotNull(dmaapWebClientUndetTest); - } - - @Test - public void logResponseSuccess() { - DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient(); - - when(clientResponseMock.statusCode()).thenReturn(HttpStatus.OK); - - final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true); - Mono<ClientResponse> logResponse = dmaapWebClientUndetTest.logResponse(clientResponseMock); - - assertEquals(clientResponseMock, logResponse.block()); - - assertEquals(Level.TRACE, logAppender.list.get(0).getLevel()); - assertEquals("Response Status 200 OK", logAppender.list.get(0).getFormattedMessage()); - - logAppender.stop(); - } - - @Test - public void logRequestSuccess() throws URISyntaxException { - when(clientRequesteMock.url()).thenReturn(new URI("http://test")); - when(clientRequesteMock.method()).thenReturn(HttpMethod.GET); - HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.add("header", "value"); - when(clientRequesteMock.headers()).thenReturn(httpHeaders); - - DmaapWebClient dmaapWebClientUndetTest = new DmaapWebClient(); - - final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapWebClient.class, true); - Mono<ClientRequest> logRequest = dmaapWebClientUndetTest.logRequest(clientRequesteMock); - - assertEquals(clientRequesteMock, logRequest.block()); - - assertEquals(Level.TRACE, logAppender.list.get(0).getLevel()); - assertEquals("Request: GET http://test", logAppender.list.get(0).getFormattedMessage()); - assertEquals(Level.TRACE, logAppender.list.get(1).getLevel()); - assertEquals("HTTP request headers: [header:\"value\"]", logAppender.list.get(1).getFormattedMessage()); - - logAppender.stop(); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java index 8fb8c364..bfb9b13e 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java @@ -46,6 +46,7 @@ import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -120,11 +121,11 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectNext(expectedMessage).verifyComplete(); } @@ -173,12 +174,12 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); String messageString = "[" + parsedString + "," + parsedString + "]"; JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); - JsonElement jsonElement1 = new JsonParser().parse(messageString); + JsonElement jsonElement = JsonParser.parseString(parsedString); + JsonElement jsonElement1 = JsonParser.parseString(messageString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement1))) + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement1))) .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete(); } @@ -200,12 +201,12 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectNextCount(0).verifyComplete(); assertTrue(logAppender.list.toString() @@ -232,12 +233,12 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectNextCount(0).verifyComplete(); assertTrue("Error missing in log", @@ -293,9 +294,9 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); String messageString = "[{\"event\":{}}," + parsedString + "]"; JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser(); - JsonElement jsonElement = new JsonParser().parse(messageString); + JsonElement jsonElement = JsonParser.parseString(messageString); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectNext(expectedMessage).verifyComplete(); } @@ -317,12 +318,12 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectComplete().verify(); assertTrue("Error missing in log", @@ -348,12 +349,12 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectNextCount(0).verifyComplete(); assertTrue("Error missing in log", @@ -374,12 +375,12 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectNextCount(0).verifyComplete(); assertTrue("Error missing in log", @@ -405,12 +406,12 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectNextCount(0).verifyComplete(); assertTrue("Error missing in log", @@ -438,12 +439,12 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectNextCount(0).verifyComplete(); assertTrue("Error missing in log", @@ -504,11 +505,11 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectNext(expectedMessage).verifyComplete(); } @@ -520,12 +521,12 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectComplete().verify(); assertTrue("Error missing in log", @@ -538,13 +539,13 @@ class JsonMessageParserTest { @Test void whenPassingJsonWithNullJsonElement_noFileData() { JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse("{}"); + JsonElement jsonElement = JsonParser.parseString("{}"); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectComplete().verify(); assertTrue("Error missing in log", @@ -569,12 +570,12 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(JsonMessageParser.class); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectNextCount(0).expectComplete().verify(); assertTrue("Error missing in log", @@ -601,11 +602,11 @@ class JsonMessageParserTest { String parsedString = message.getParsed(); JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser()); - JsonElement jsonElement = new JsonParser().parse(parsedString); + JsonElement jsonElement = JsonParser.parseString(parsedString); Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest) .getJsonObjectFromAnArray(jsonElement); - StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(jsonElement))).expectSubscription() + StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription() .expectComplete().verify(); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java index d4541efb..1ddb3a5c 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -50,10 +50,10 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper; import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; /** * Test for DmaapProducerHttpClient. @@ -73,7 +73,7 @@ class DmaapProducerHttpClientTest { private DmaapProducerHttpClient producerClientUnderTestSpy; - private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class); + private PublisherConfiguration dmaapPublisherConfigurationMock = mock(PublisherConfiguration.class); private HttpAsyncClientBuilderWrapper clientBuilderMock; @@ -83,11 +83,8 @@ class DmaapProducerHttpClientTest { @BeforeEach void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException { - when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST); - when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME); - when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT); - when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin"); - when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin"); + when(dmaapPublisherConfigurationMock.userName()).thenReturn("dradmin"); + when(dmaapPublisherConfigurationMock.password()).thenReturn("dradmin"); producerClientUnderTestSpy = spy(new DmaapProducerHttpClient(dmaapPublisherConfigurationMock)); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java index d4dd89f0..f0c8e3b3 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java @@ -21,26 +21,8 @@ package org.onap.dcaegen2.collectors.datafile.tasks; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; -import static org.onap.dcaegen2.collectors.datafile.configuration.AppConfigTest.CORRECT_CONSUMER_CONFIG; - import com.google.gson.JsonElement; import com.google.gson.JsonParser; - -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Optional; - import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; @@ -58,13 +40,27 @@ import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; - +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + public class DMaaPMessageConsumerTest { private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady"; private static final String PRODUCT_NAME = "NrRadio"; @@ -90,8 +86,6 @@ public class DMaaPMessageConsumerTest { private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<>(); private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES"; - private DMaaPConsumerReactiveHttpClient httpClientMock; - private DMaaPMessageConsumer messageConsumer; private static String ftpesMessageString; private static JsonElement ftpesMessageJson; @@ -105,6 +99,7 @@ public class DMaaPMessageConsumerTest { private static AppConfig appConfig; private static ConsumerConfiguration dmaapConsumerConfiguration; + private static MessageRouterSubscriber messageRouterSubscriber; /** * Sets up data for the test. @@ -113,9 +108,6 @@ public class DMaaPMessageConsumerTest { public static void setUp() { appConfig = mock(AppConfig.class); - dmaapConsumerConfiguration = CORRECT_CONSUMER_CONFIG; - - JsonParser jsonParser = new JsonParser(); AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() // .location(FTPES_LOCATION) // @@ -133,7 +125,7 @@ public class DMaaPMessageConsumerTest { .build(); ftpesMessageString = ftpesJsonMessage.toString(); - ftpesMessageJson = jsonParser.parse(ftpesMessageString); + ftpesMessageJson = JsonParser.parseString(ftpesMessageString); MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() // .productName(PRODUCT_NAME) // @@ -175,7 +167,7 @@ public class DMaaPMessageConsumerTest { .addAdditionalField(sftpAdditionalField) // .build(); sftpMessageString = sftpJsonMessage.toString(); - sftpMessageJson = jsonParser.parse(sftpMessageString); + sftpMessageJson = JsonParser.parseString(sftpMessageString); sftpFileData = ImmutableFileData.builder() // .name(PM_FILE_NAME) // .location(SFTP_LOCATION) // @@ -220,46 +212,50 @@ public class DMaaPMessageConsumerTest { .expectError(DatafileTaskException.class) // .verify(); - verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty()); + verify(messageRouterSubscriber, times(1)) + .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()); } @Test - public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException { + public void whenFtpes_ReturnsCorrectResponse() { prepareMocksForDmaapConsumer(Optional.of(ftpesMessageJson), expectedFtpesMessage); StepVerifier.create(messageConsumer.getMessageRouterResponse()) // .expectNext(expectedFtpesMessage) // .verifyComplete(); - verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty()); - verifyNoMoreInteractions(httpClientMock); + + + verify(messageRouterSubscriber, times(1)) + .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()); + verifyNoMoreInteractions(messageRouterSubscriber); } @Test - public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException { + public void whenSftp_ReturnsCorrectResponse() { prepareMocksForDmaapConsumer(Optional.of(sftpMessageJson), expectedSftpMessage); StepVerifier.create(messageConsumer.getMessageRouterResponse()) // .expectNext(expectedSftpMessage) // .verifyComplete(); - verify(httpClientMock, times(1)).getDMaaPConsumerResponse(Optional.empty()); - verifyNoMoreInteractions(httpClientMock); + verify(messageRouterSubscriber, times(1)) + .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()); + verifyNoMoreInteractions(messageRouterSubscriber); } private void prepareMocksForDmaapConsumer(Optional<JsonElement> message, FileReadyMessage fileReadyMessageAfterConsume) { - Mono<JsonElement> messageAsMono = message.isPresent() ? Mono.just(message.get()) : Mono.empty(); + Flux<JsonElement> messageAsMono = message.isPresent() ? Flux.just(message.get()) : Flux.empty(); + + messageRouterSubscriber = mock(MessageRouterSubscriber.class); + dmaapConsumerConfiguration = new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), + messageRouterSubscriber, mock(MessageRouterSubscribeRequest.class)); + JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class); - httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class); - when(httpClientMock.getDMaaPConsumerResponse(Optional.empty())).thenReturn(messageAsMono); + when(messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest())) + .thenReturn(messageAsMono); when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration); - ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class); - try { - doReturn(httpClientMock).when(httpClientFactory).create(dmaapConsumerConfiguration.toDmaap()); - } catch (DatafileTaskException e) { - e.printStackTrace(); - } if (message.isPresent()) { when(jsonMessageParserMock.getMessagesFromJson(any())).thenReturn(Flux.just(fileReadyMessageAfterConsume)); @@ -268,7 +264,7 @@ public class DMaaPMessageConsumerTest { .thenReturn(Flux.error(new DatafileTaskException("problemas"))); } - messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock, httpClientFactory)); + messageConsumer = spy(new DMaaPMessageConsumer(appConfig, jsonMessageParserMock)); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java index 1cb79bcf..199ac9f6 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java @@ -52,7 +52,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration; -import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; @@ -66,6 +65,9 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage; import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData; import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData; import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; import org.slf4j.MDC; @@ -110,7 +112,7 @@ public class ScheduledTasksTest { .publishUrl(publishUrl) // .logUrl("") // .userName("userName") // - .passWord("passWord") // + .password("passWord") // .trustStorePath("trustStorePath") // .trustStorePasswordPath("trustStorePasswordPath") // .keyStorePath("keyStorePath") // @@ -118,13 +120,10 @@ public class ScheduledTasksTest { .enableDmaapCertAuth(true) // .changeIdentifier(CHANGE_IDENTIFIER) // .build(); // - final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() // - .topicUrl("topicUrl").trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // - .enableDmaapCertAuth(true) // - .build(); + final ConsumerConfiguration dmaapConsumerConfiguration = + new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class), + mock(MessageRouterSubscribeRequest.class)); + doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER); doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration(); @@ -266,7 +265,7 @@ public class ScheduledTasksTest { .publishUrl(publishUrl) // .logUrl("") // .userName("userName") // - .passWord("passWord") // + .password("passWord") // .trustStorePath("trustStorePath") // .trustStorePasswordPath("trustStorePasswordPath") // .keyStorePath("keyStorePath") // @@ -274,13 +273,9 @@ public class ScheduledTasksTest { .enableDmaapCertAuth(true) // .changeIdentifier("Different changeIdentifier") // .build(); // - final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() // - .topicUrl("topicUrl").trustStorePath("trustStorePath") // - .trustStorePasswordPath("trustStorePasswordPath") // - .keyStorePath("keyStorePath") // - .keyStorePasswordPath("keyStorePasswordPath") // - .enableDmaapCertAuth(true) // - .build(); + final ConsumerConfiguration dmaapConsumerConfiguration = + new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class), + mock(MessageRouterSubscribeRequest.class)); doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER); doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration(); diff --git a/datafile-app-server/src/test/resources/cert.jks b/datafile-app-server/src/test/resources/cert.jks Binary files differnew file mode 100755 index 00000000..ff0e95ce --- /dev/null +++ b/datafile-app-server/src/test/resources/cert.jks diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test.json b/datafile-app-server/src/test/resources/datafile_endpoints_test.json index 62119e61..8e51b807 100644 --- a/datafile-app-server/src/test/resources/datafile_endpoints_test.json +++ b/datafile-app-server/src/test/resources/datafile_endpoints_test.json @@ -5,11 +5,14 @@ "dmaap.ftpesConfig.keyPasswordPath": "/src/test/resources/dfc.jks.pass", "dmaap.ftpesConfig.trustedCa": "/src/test/resources/ftp.jks", "dmaap.ftpesConfig.trustedCaPasswordPath": "/src/test/resources/ftp.jks.pass", - "dmaap.security.trustStorePath": "trustStorePath", - "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath", - "dmaap.security.keyStorePath": "keyStorePath", - "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath", + "dmaap.security.trustStorePath": "src/test/resources/trust.jks", + "dmaap.security.trustStorePasswordPath": "src/test/resources/trust.pass", + "dmaap.security.keyStorePath": "src/test/resources/cert.jks", + "dmaap.security.keyStorePasswordPath": "src/test/resources/jks.pass", "dmaap.security.enableDmaapCertAuth": "true", + "dmaap.dmaapConsumerConfiguration.consumerGroup": "OpenDcae-c12", + "dmaap.dmaapConsumerConfiguration.consumerId": "C12", + "dmaap.dmaapConsumerConfiguration.timeoutMs": 1000, "sftp.security.strictHostKeyChecking": "false", "streams_publishes": { "PM_MEAS_FILES": { @@ -27,7 +30,7 @@ "streams_subscribes": { "dmaap_subscriber": { "dmaap_info": { - "topic_url": "http://dradmin:dradmin@localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + "topic_url": "http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT" }, "type": "message_router" } diff --git a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json index 480a6f79..e1327462 100644 --- a/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json +++ b/datafile-app-server/src/test/resources/datafile_endpoints_test_2producers.json @@ -5,12 +5,15 @@ "dmaap.ftpesConfig.keyPasswordPath": "/src/test/resources/dfc.jks.pass", "dmaap.ftpesConfig.trustedCa": "/src/test/resources/ftp.jks", "dmaap.ftpesConfig.trustedCaPasswordPath": "/src/test/resources/ftp.jks.pass", - "dmaap.security.trustStorePath": "trustStorePath", - "dmaap.security.trustStorePasswordPath": "trustStorePasswordPath", - "dmaap.security.keyStorePath": "keyStorePath", - "dmaap.security.keyStorePasswordPath": "keyStorePasswordPath", + "dmaap.security.trustStorePath": "src/test/resources/trust.jks", + "dmaap.security.trustStorePasswordPath": "src/test/resources/trust.pass", + "dmaap.security.keyStorePath": "src/test/resources/cert.jks", + "dmaap.security.keyStorePasswordPath": "src/test/resources/jks.pass", "dmaap.security.enableDmaapCertAuth": "true", "sftp.security.strictHostKeyChecking": "false", + "dmaap.dmaapConsumerConfiguration.consumerGroup": "OpenDcae-c12", + "dmaap.dmaapConsumerConfiguration.consumerId": "C12", + "dmaap.dmaapConsumerConfiguration.timeoutMs": 1000, "streams_publishes": { "PM_MEAS_FILES": { "type": "data_router", @@ -49,7 +52,7 @@ "streams_subscribes": { "dmaap_subscriber": { "dmaap_info": { - "topic_url": "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" + "topic_url": "http://message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12" }, "type": "message_router" } diff --git a/datafile-app-server/src/test/resources/jks.pass b/datafile-app-server/src/test/resources/jks.pass new file mode 100755 index 00000000..b2c3df47 --- /dev/null +++ b/datafile-app-server/src/test/resources/jks.pass @@ -0,0 +1 @@ +hD:!w:CxF]lGvM6Mz9l^j[7U
\ No newline at end of file diff --git a/datafile-app-server/src/test/resources/trust.jks b/datafile-app-server/src/test/resources/trust.jks Binary files differnew file mode 100755 index 00000000..fc62ad2f --- /dev/null +++ b/datafile-app-server/src/test/resources/trust.jks diff --git a/datafile-app-server/src/test/resources/trust.pass b/datafile-app-server/src/test/resources/trust.pass new file mode 100755 index 00000000..047a411a --- /dev/null +++ b/datafile-app-server/src/test/resources/trust.pass @@ -0,0 +1 @@ +jeQ2l]iyB62D{WbSHL]dN*8R
\ No newline at end of file |