diff options
author | wasala <przemyslaw.wasala@nokia.com> | 2018-09-19 12:55:18 +0200 |
---|---|---|
committer | wasala <przemyslaw.wasala@nokia.com> | 2018-09-20 12:50:32 +0200 |
commit | 3468d474187ef01546bdf1180d11453a4f924d31 (patch) | |
tree | 79aafcd9b4f7d36d27d0412f0ac3946c0ad79034 | |
parent | 2d1ea513ca0386102f2f9c11bd5c76f37939113a (diff) |
Loading configuration from consul/cbs
*Registered task which calling cbs/consul
for configuration - fixedRate 5 minutes
*Added workflow for loading config from cloud
Change-Id: Iba36d18b4ee0dca082612fa4c92c877f71c9b1fe
Issue-ID: DCAEGEN2-784
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
20 files changed, 858 insertions, 65 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java new file mode 100644 index 00000000..7570d704 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import com.google.gson.JsonObject; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +public class CloudConfigParser { + + private final JsonObject jsonObject; + + CloudConfigParser(JsonObject jsonObject) { + this.jsonObject = jsonObject; + } + + DmaapPublisherConfiguration getDmaapPublisherConfig() { + return new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) + .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) + .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString()) + .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) + .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) + .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) + .build(); + } + + DmaapConsumerConfiguration getDmaapConsumerConfig() { + return new ImmutableDmaapConsumerConfiguration.Builder() + .timeoutMS(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMS").getAsInt()) + .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) + .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString()) + .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString()) + .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt()) + .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString()) + .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt()) + .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) + .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) + .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) + .build(); + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java new file mode 100644 index 00000000..7bf711bc --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import com.google.gson.JsonObject; +import java.util.Optional; +import java.util.Properties; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; +import org.onap.dcaegen2.collectors.datafile.service.DatafileConfigurationProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.scheduling.annotation.EnableScheduling; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +@Configuration +@EnableConfigurationProperties +@EnableScheduling +@Primary +public class CloudConfiguration extends AppConfig { + + private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class); + + private DatafileConfigurationProvider datafileConfigurationProvider; + private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration; + private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration; + + @Value("#{systemEnvironment}") + private Properties systemEnvironment; + + @Autowired + public void setThreadPoolTaskScheduler(DatafileConfigurationProvider datafileConfigurationProvider) { + this.datafileConfigurationProvider = datafileConfigurationProvider; + } + + + protected void runTask() { + Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)) + .subscribeOn(Schedulers.parallel()) + .subscribe(this::parsingConfigSuccess, this::parsingConfigError); + } + + private void parsingConfigError(Throwable throwable) { + logger.warn("Error in case of processing system environment, more details below: ", throwable); + } + + private void cloudConfigError(Throwable throwable) { + logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); + } + + private void parsingConfigSuccess(EnvProperties envProperties) { + logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul"); + datafileConfigurationProvider.callForDataFileCollectorConfiguration(envProperties) + .subscribe(this::parseCloudConfig, this::cloudConfigError); + } + + private void parseCloudConfig(JsonObject jsonObject) { + logger.info("Received application configuration: {}", jsonObject); + CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); + dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); + dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); + } + + @Override + public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration()); + } + + @Override + public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration()); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java new file mode 100644 index 00000000..9f6b2737 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import java.util.Optional; +import java.util.Properties; +import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; +import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableEnvProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +class EnvironmentProcessor { + + private static final int DEFAULT_CONSUL_PORT = 8500; + private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); + + private EnvironmentProcessor() { + } + + static Mono<EnvProperties> evaluate(Properties systemEnvironment) { + logger.info("Loading configuration from system environment variables"); + EnvProperties envProperties; + try { + envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment)) + .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment)) + .appName(getService(systemEnvironment)).build(); + } catch (EnvironmentLoaderException e) { + return Mono.error(e); + } + logger.info("Evaluated environment system variables {}", envProperties); + return Mono.just(envProperties); + } + + private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { + return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST")) + .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined")); + } + + private static Integer getConsultPort(Properties systemEnvironments) { + return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf) + .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); + } + + private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException { + return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) + .orElseThrow( + () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined")); + } + + private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException { + return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME")) + .orElse(systemEnvironments.getProperty("SERVICE_NAME"))) + .orElseThrow(() -> new EnvironmentLoaderException( + "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment")); + } + + private static Integer getDefaultPortOfConsul() { + logger.warn("$CONSUL_PORT environment has not been defined"); + logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); + return DEFAULT_CONSUL_PORT; + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 1d0a192f..512a2178 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -16,6 +16,8 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; @@ -40,16 +42,20 @@ import reactor.core.publisher.Mono; @EnableScheduling public class SchedulerConfig extends DatafileAppConfig { - private static final int SCHEDULING_DELAY = 2000; + private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 10; + private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<ScheduledFuture>(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; + private final CloudConfiguration cloudConfiguration; @Autowired - public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask) { + public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask, + CloudConfiguration cloudConfiguration) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; + this.cloudConfiguration = cloudConfiguration; } /** @@ -62,7 +68,7 @@ public class SchedulerConfig extends DatafileAppConfig { scheduledFutureList.forEach(x -> x.cancel(false)); scheduledFutureList.clear(); return Mono.defer(() -> Mono - .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); + .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); } /** @@ -74,8 +80,11 @@ public class SchedulerConfig extends DatafileAppConfig { @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { if (scheduledFutureList.isEmpty()) { + scheduledFutureList.add(taskScheduler + .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), + Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask, - SCHEDULING_DELAY)); + Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS))); return true; } else { return false; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java new file mode 100644 index 00000000..75c2e8a8 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +public class EnvironmentLoaderException extends Exception { + + public EnvironmentLoaderException(String message) { + super(message); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java index 0f03b1a4..1e2dcc91 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java @@ -27,7 +27,7 @@ import java.util.List; import org.apache.commons.io.FilenameUtils; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java new file mode 100644 index 00000000..86549343 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import org.immutables.value.Value; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +@Value.Immutable(prehash = true) +public interface EnvProperties { + + @Value.Parameter + String consulHost(); + + @Value.Parameter + Integer consulPort(); + + @Value.Parameter + String cbsName(); + + @Value.Parameter + String appName(); + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 948976b6..48c4896f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -16,22 +16,28 @@ * ============LICENSE_END======================================================================== */ -package org.onap.dcaegen2.collectors.datafile.service; +package org.onap.dcaegen2.collectors.datafile.model; +import org.immutables.gson.Gson; import org.immutables.value.Value; /** * Contains data, from the fileReady event, about the file to collect from the xNF. * * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - * */ @Value.Immutable +@Gson.TypeAdapters public interface FileData { - public String changeIdentifier(); - public String changeType(); - public String location(); - public String compression(); - public String fileFormatType(); - public String fileFormatVersion(); + String changeIdentifier(); + + String changeType(); + + String location(); + + String compression(); + + String fileFormatType(); + + String fileFormatVersion(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java new file mode 100644 index 00000000..aca87e5b --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java @@ -0,0 +1,102 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import org.springframework.web.util.DefaultUriBuilderFactory; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +@Service +public class DatafileConfigurationProvider { + + private static final Logger logger = LoggerFactory.getLogger(DatafileConfigurationProvider.class); + + private final HttpGetClient httpGetClient; + + DatafileConfigurationProvider() { + this(new HttpGetClient()); + } + + DatafileConfigurationProvider(HttpGetClient httpGetClient) { + this.httpGetClient = httpGetClient; + } + + public Mono<JsonObject> callForDataFileCollectorConfiguration(EnvProperties envProperties) { + return callConsulForConfigBindingServiceEndpoint(envProperties) + .flatMap(this::callConfigBindingServiceForDatafileConfiguration); + } + + private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) { + logger.info("Retrieving Config Binding Service endpoint from Consul"); + return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class) + .flatMap(jsonArray -> this.createConfigBindingServiceUrl(jsonArray, envProperties.appName())); + + } + + private String getConsulUrl(EnvProperties envProperties) { + return getUri(envProperties.consulHost(), envProperties.consulPort(), "/v1/catalog/service", + envProperties.cbsName()); + } + + private Mono<JsonObject> callConfigBindingServiceForDatafileConfiguration(String configBindingServiceUri) { + logger.info("Retrieving Datafile configuration"); + return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class); + } + + + private Mono<String> createConfigBindingServiceUrl(JsonArray jsonArray, String appName) { + return getConfigBindingObject(jsonArray) + .flatMap(jsonObject -> buildConfigBindingServiceUrl(jsonObject, appName)); + } + + private Mono<String> buildConfigBindingServiceUrl(JsonObject jsonObject, String appName) { + return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(), + jsonObject.get("ServicePort").getAsInt(), "/service_component", appName)); + } + + private Mono<JsonObject> getConfigBindingObject(JsonArray jsonArray) { + try { + if (jsonArray.size() > 0) { + return Mono.just(jsonArray.get(0).getAsJsonObject()); + } else { + throw new IllegalStateException("JSON Array was empty"); + } + } catch (IllegalStateException e) { + logger.warn("Failed to retrieve JSON Object from array", e); + return Mono.error(e); + } + } + + private String getUri(String host, Integer port, String... paths) { + return new DefaultUriBuilderFactory().builder() + .scheme("http") + .host(host) + .port(port) + .path(String.join("/", paths)) + .build().toString(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index c71d1435..72e7d497 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -30,6 +30,8 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseExcept import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; @@ -42,7 +44,7 @@ import reactor.core.publisher.Mono; */ public class DmaapConsumerJsonParser { - private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); + private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); private static final String EVENT = "event"; private static final String NOTIFICATION_FIELDS = "notificationFields"; @@ -59,8 +61,7 @@ public class DmaapConsumerJsonParser { private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; /** - * Extract info from string and create @see - * {@link org.onap.dcaegen2.collectors.datafile.service.FileData}. + * Extract info from string and create @see {@link FileData}. * * @param monoMessage - results from DMaaP * @return reactive Mono with an array of FileData @@ -71,17 +72,17 @@ public class DmaapConsumerJsonParser { private Mono<JsonElement> getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + : Mono.fromSupplier(() -> new JsonParser().parse(message)); } private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) - : getFileDataFromJsonArray(jsonElement); + : getFileDataFromJsonArray(jsonElement); } private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) { return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); + .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); } public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { @@ -90,8 +91,8 @@ public class DmaapConsumerJsonParser { private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) { return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) - ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) - : transform(monoJsonP)); + ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + : transform(monoJsonP)); } private Mono<List<FileData>> transform(JsonObject jsonObject) { @@ -103,28 +104,28 @@ public class DmaapConsumerJsonParser { JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) - && arrayOfNamedHashMap != null) { + && arrayOfNamedHashMap != null) { Mono<List<FileData>> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); return res; } if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { return Mono.error( - new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); + new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); } else if (arrayOfNamedHashMap != null) { return Mono.error( - new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); + new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); } return Mono.error( - new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); + new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); } return Mono.error( - new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); + new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); } private Mono<List<FileData>> getAllFileDataFromJson(String changeIdentifier, String changeType, - JsonArray arrayOfAdditionalFields) { + JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { if (arrayOfAdditionalFields.get(i) != null) { @@ -134,7 +135,7 @@ public class DmaapConsumerJsonParser { if (fileData != null) { res.add(fileData); } else { - LOGGER.error("Unable to collect file from xNF. File information wrong. " + fileInfo); + logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo); } } } @@ -152,10 +153,10 @@ public class DmaapConsumerJsonParser { String compression = getValueFromJson(data, COMPRESSION); if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) - && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { + && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType) - .location(location).compression(compression).fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion).build(); + .location(location).compression(compression).fileFormatType(fileFormatType) + .fileFormatVersion(fileFormatVersion).build(); } return fileData; } @@ -165,20 +166,20 @@ public class DmaapConsumerJsonParser { } private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType, - String notificationFieldsVersion) { + String notificationFieldsVersion) { return ((changeIdentifier != null && !changeIdentifier.isEmpty()) - && (changeType != null && !changeType.isEmpty()) - && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty())); + && (changeType != null && !changeType.isEmpty()) + && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty())); } private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) { return ((fileFormatVersion != null && !fileFormatVersion.isEmpty()) - && (fileFormatType != null && !fileFormatType.isEmpty())); + && (fileFormatType != null && !fileFormatType.isEmpty())); } private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) { return (name != null && !name.isEmpty()) && (location != null && !location.isEmpty()) - && (compression != null && !compression.isEmpty()); + && (compression != null && !compression.isEmpty()); } private boolean containsHeader(JsonObject jsonObject) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java new file mode 100644 index 00000000..796dbbd8 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + + +public class HttpGetClient { + + private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class); + + private final WebClient webClient; + private final Gson gson; + + HttpGetClient() { + this(WebClient.builder().filter(logRequest()).filter(logResponse()).build()); + } + + HttpGetClient(WebClient webClient) { + this.webClient = webClient; + this.gson = new Gson(); + } + + <T> Mono<T> callHttpGet(String url, Class<T> genericClassDeclaration) { + return webClient + .get() + .uri(url) + .retrieve() + .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response))) + .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response))) + .bodyToMono(String.class) + .flatMap(body -> getJsonFromRequest(body, genericClassDeclaration)); + } + + private RuntimeException getException(ClientResponse response) { + return new RuntimeException(String.format("Request for cloud config failed: HTTP %d", + response.statusCode().value())); + } + + private <T> Mono<T> getJsonFromRequest(String body, Class<T> genericClassDeclaration) { + try { + return Mono.just(parseJson(body, genericClassDeclaration)); + } catch (JsonSyntaxException | IllegalStateException e) { + return Mono.error(e); + } + } + + private <T> T parseJson(String body, Class<T> genericClassDeclaration) { + return gson.fromJson(body, genericClassDeclaration); + } + + private static ExchangeFilterFunction logResponse() { + return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { + logger.info("Response status {}", clientResponse.statusCode()); + return Mono.just(clientResponse); + }); + } + + private static ExchangeFilterFunction logRequest() { + return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { + logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + clientRequest.headers() + .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + return Mono.just(clientRequest); + }); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java index 30bf536e..0c76fc17 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java @@ -25,7 +25,7 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; -import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java index fdd1bb49..839e03c9 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java @@ -24,7 +24,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.Config; import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,8 +55,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { } protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, - DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) { + DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) { this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; @@ -79,7 +79,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { dmaaPConsumerReactiveHttpClient = resolveClient(); logger.trace("Method called with arg {}", object); Mono<List<FileData>> consumerResult = - consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); + consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); return consumerResult.flatMap(this::getFilesFromSender); } @@ -95,8 +95,6 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override protected DmaapConsumerReactiveHttpClient resolveClient() { - return dmaaPConsumerReactiveHttpClient == null - ? new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()) - : dmaaPConsumerReactiveHttpClient; + return new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java index 8c4d7072..5779051c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java @@ -49,15 +49,14 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) - throws DatafileTaskException { + public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) { logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels); return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels); } @Override public Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) - throws DatafileTaskException { + throws DatafileTaskException { if (consumerDmaapModels == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } @@ -73,8 +72,7 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { @Override DmaapProducerReactiveHttpClient resolveClient() { - return dmaapProducerReactiveHttpClient == null - ? new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()) - : dmaapProducerReactiveHttpClient; + return new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()); } + } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java new file mode 100644 index 00000000..a4f098be --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java @@ -0,0 +1,104 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration; + +class CloudConfigParserTest { + + private static final String correctJson = + "{\"dmaap.dmaapProducerConfiguration.dmaapTopicName\": \"/events/unauthenticated.VES_NOTIFICATION_OUTPUT\", " + + "\"dmaap.dmaapConsumerConfiguration.timeoutMS\": -1," + + " \"dmaap.dmaapConsumerConfiguration.dmaapHostName\": \"message-router.onap.svc.cluster.local\"," + + "\"dmaap.dmaapConsumerConfiguration.dmaapUserName\": \"admin\", " + + "\"dmaap.dmaapProducerConfiguration.dmaapPortNumber\": 3904, " + + "\"dmaap.dmaapConsumerConfiguration.dmaapUserPassword\": \"admin\", " + + "\"dmaap.dmaapProducerConfiguration.dmaapProtocol\": \"http\", " + + "\"dmaap.dmaapProducerConfiguration.dmaapContentType\": \"application/json\", " + + "\"dmaap.dmaapConsumerConfiguration.dmaapTopicName\": \"/events/unauthenticated.VES_NOTIFICATION_OUTPUT\", " + + "\"dmaap.dmaapConsumerConfiguration.dmaapPortNumber\": 3904, " + + "\"dmaap.dmaapConsumerConfiguration.dmaapContentType\": \"application/json\", " + + "\"dmaap.dmaapConsumerConfiguration.messageLimit\": -1, " + + "\"dmaap.dmaapConsumerConfiguration.dmaapProtocol\": \"http\", " + + "\"dmaap.dmaapConsumerConfiguration.consumerId\": \"c12\"," + + "\"dmaap.dmaapProducerConfiguration.dmaapHostName\": \"message-router.onap.svc.cluster.local\", " + + "\"dmaap.dmaapConsumerConfiguration.consumerGroup\": \"OpenDCAE-c12\", " + + "\"dmaap.dmaapProducerConfiguration.dmaapUserName\": \"admin\", " + + "\"dmaap.dmaapProducerConfiguration.dmaapUserPassword\": \"admin\"}"; + + private static final ImmutableDmaapConsumerConfiguration correctDmaapConsumerConfig = + new ImmutableDmaapConsumerConfiguration.Builder() + .timeoutMS(-1) + .dmaapHostName("message-router.onap.svc.cluster.local") + .dmaapUserName("admin") + .dmaapUserPassword("admin") + .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") + .dmaapPortNumber(3904) + .dmaapContentType("application/json") + .messageLimit(-1) + .dmaapProtocol("http") + .consumerId("c12") + .consumerGroup("OpenDCAE-c12") + .build(); + + private static final ImmutableDmaapPublisherConfiguration correctDmaapPublisherConfig = + new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") + .dmaapUserPassword("admin") + .dmaapPortNumber(3904) + .dmaapProtocol("http") + .dmaapContentType("application/json") + .dmaapHostName("message-router.onap.svc.cluster.local") + .dmaapUserName("admin") + .build(); + + private CloudConfigParser cloudConfigParser = new CloudConfigParser( + new Gson().fromJson(correctJson, JsonObject.class)); + + + @Test + public void shouldCreateDmaapConsumerConfigurationCorrectly() { + // when + DmaapConsumerConfiguration dmaapConsumerConfig = cloudConfigParser.getDmaapConsumerConfig(); + + // then + assertThat(dmaapConsumerConfig).isNotNull(); + assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(correctDmaapConsumerConfig); + } + + + @Test + public void shouldCreateDmaapPublisherConfigurationCorrectly() { + // when + DmaapPublisherConfiguration dmaapPublisherConfig = cloudConfigParser.getDmaapPublisherConfig(); + + // then + assertThat(dmaapPublisherConfig).isNotNull(); + assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(correctDmaapPublisherConfig); + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java index 5b9d0aaf..2f61ac97 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectorTest.java @@ -28,14 +28,13 @@ import java.util.List; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.FileData; -import org.onap.dcaegen2.collectors.datafile.service.ImmutableFileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import reactor.core.publisher.Mono; /** * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - * */ public class FileCollectorTest { @@ -64,15 +63,15 @@ public class FileCollectorTest { public void whenSingleFtpesFile_returnCorrectResponse() { List<FileData> listOfFileData = new ArrayList<FileData>(); listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) - .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build()); + .changeType(FILE_READY_CHANGE_TYPE).location(FTPES_LOCATION).compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build()); FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22) - .userId("").password("").build(); + .userId("").password("").build(); when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true); Mono<List<ConsumerDmaapModel>> consumerModelsMono = - fileCollectorUndetTest.getFilesFromSender(listOfFileData); + fileCollectorUndetTest.getFilesFromSender(listOfFileData); List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block(); assertEquals(1, consumerModels.size()); @@ -82,7 +81,7 @@ public class FileCollectorTest { assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion()); assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation()); FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS) - .userId("").password("").port(PORT_22).build(); + .userId("").password("").port(PORT_22).build(); verify(ftpsClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verifyNoMoreInteractions(ftpsClientMock); } @@ -91,15 +90,15 @@ public class FileCollectorTest { public void whenSingleSftpFile_returnCorrectResponse() { List<FileData> listOfFileData = new ArrayList<FileData>(); listOfFileData.add(ImmutableFileData.builder().changeIdentifier(PM_MEAS_CHANGE_IDINTIFIER) - .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION) - .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build()); + .changeType(FILE_READY_CHANGE_TYPE).location(SFTP_LOCATION).compression(GZIP_COMPRESSION) + .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE).fileFormatVersion(FILE_FORMAT_VERSION).build()); FileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS).port(PORT_22) - .userId("").password("").build(); + .userId("").password("").build(); when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION)).thenReturn(true); Mono<List<ConsumerDmaapModel>> consumerModelsMono = - fileCollectorUndetTest.getFilesFromSender(listOfFileData); + fileCollectorUndetTest.getFilesFromSender(listOfFileData); List<ConsumerDmaapModel> consumerModels = consumerModelsMono.block(); assertEquals(1, consumerModels.size()); @@ -109,7 +108,7 @@ public class FileCollectorTest { assertEquals(FILE_FORMAT_VERSION, consumerDmaapModel.getFileFormatVersion()); assertEquals(LOCAL_FILE_LOCATION, consumerDmaapModel.getLocation()); FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress(SERVER_ADDRESS) - .userId("").password("").port(PORT_22).build(); + .userId("").password("").port(PORT_22).build(); verify(sftpClientMock, times(1)).collectFile(expectedFileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); verifyNoMoreInteractions(ftpsClientMock); } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProviderTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProviderTest.java new file mode 100644 index 00000000..efd89837 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProviderTest.java @@ -0,0 +1,90 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableEnvProperties; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +class DatafileConfigurationProviderTest { + private static final Gson gson = new Gson(); + private static final String configBindingService = "[{\"ID\":\"9c8dd674-34ce-7049-d318-e98d93a64303\",\"Node\"" + + ":\"dcae-bootstrap\",\"Address\":\"10.42.52.82\",\"Datacenter\":\"dc1\",\"TaggedAddresses\":" + + "{\"lan\":\"10.42.52.82\",\"wan\":\"10.42.52.82\"},\"NodeMeta\":{\"consul-network-segment\":\"\"}," + + "\"ServiceID\":\"dcae-cbs1\",\"ServiceName\":\"config-binding-service\",\"ServiceTags\":[]," + + "\"ServiceAddress\":\"config-binding-service\",\"ServicePort\":10000,\"ServiceEnableTagOverride\":false," + + "\"CreateIndex\":14352,\"ModifyIndex\":14352},{\"ID\":\"35c6f540-a29c-1a92-23b0-1305bd8c81f5\",\"Node\":" + + "\"dev-consul-server-1\",\"Address\":\"10.42.165.51\",\"Datacenter\":\"dc1\",\"TaggedAddresses\":" + + "{\"lan\":\"10.42.165.51\",\"wan\":\"10.42.165.51\"},\"NodeMeta\":{\"consul-network-segment\":\"\"}," + + "\"ServiceID\":\"dcae-cbs1\",\"ServiceName\":\"config-binding-service\",\"ServiceTags\":[]," + + "\"ServiceAddress\":\"config-binding-service\",\"ServicePort\":10000,\"ServiceEnableTagOverride\":false," + + "\"CreateIndex\":803,\"ModifyIndex\":803}]"; + private static final JsonArray configBindingServiceJson = gson.fromJson(configBindingService, JsonArray.class); + private static final JsonArray emptyConfigBindingServiceJson = gson.fromJson("[]", JsonArray.class); + private static final String datafileCollectorMockConfiguration = "{\"test\":1}"; + private static final JsonObject datafileCollectorMockConfigurationJson = gson.fromJson(datafileCollectorMockConfiguration, JsonObject.class); + + private EnvProperties envProperties = ImmutableEnvProperties.builder() + .appName("dcae-datafile-collector") + .cbsName("config-binding-service") + .consulHost("consul") + .consulPort(8500) + .build(); + + @Test + void shouldReturnDatafileCollectorConfiguration() { + // given + HttpGetClient webClient = mock(HttpGetClient.class); + when( + webClient.callHttpGet("http://consul:8500/v1/catalog/service/config-binding-service", JsonArray.class)) + .thenReturn(Mono.just(configBindingServiceJson)); + when(webClient.callHttpGet("http://config-binding-service:10000/service_component/dcae-datafile-collector", JsonObject.class)) + .thenReturn(Mono.just(datafileCollectorMockConfigurationJson)); + + DatafileConfigurationProvider provider = new DatafileConfigurationProvider(webClient); + + //when/then + StepVerifier.create(provider.callForDataFileCollectorConfiguration(envProperties)).expectSubscription() + .expectNext(datafileCollectorMockConfigurationJson).verifyComplete(); + } + + @Test + void shouldReturnMonoErrorWhenConsuleDoesntHaveConfigBindingServiceEntry() { + // given + HttpGetClient webClient = mock(HttpGetClient.class); + when( + webClient.callHttpGet("http://consul:8500/v1/catalog/service/config-binding-service", JsonArray.class)) + .thenReturn(Mono.just(emptyConfigBindingServiceJson)); + + DatafileConfigurationProvider provider = new DatafileConfigurationProvider(webClient); + + //when/then + StepVerifier.create(provider.callForDataFileCollectorConfiguration(envProperties)).expectSubscription() + .expectError(IllegalStateException.class).verify(); + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java index 4aad5f45..8c36a51f 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java @@ -26,6 +26,8 @@ import java.util.Optional; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; @@ -44,6 +46,7 @@ class DmaapConsumerJsonParserTest { @Test void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException { AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder().name("A20161224.1030-1045.bin.gz") + .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1045.bin.gz").compression("gzip") .fileFormatType("org.3GPP.32.435#measCollec").fileFormatVersion("V10").build(); JsonMessage message = new JsonMessage.JsonMessageBuilder().changeIdentifier("PM_MEAS_FILES") diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClientTest.java new file mode 100644 index 00000000..d95e3417 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClientTest.java @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonSyntaxException; +import org.junit.jupiter.api.Test; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +class HttpGetClientTest { + + private static final String SOMEURL = "http://someurl"; + private static final String DATA = "{}"; + private Gson gson = new Gson(); + private WebClient webClient = mock(WebClient.class); + private WebClient.RequestHeadersUriSpec requestBodyUriSpec = mock(WebClient.RequestBodyUriSpec.class); + private WebClient.ResponseSpec responseSpec = mock(WebClient.ResponseSpec.class); + + @Test + void shouldReturnJsonObjectOnGetCall() { + //given + mockWebClientDependantObject(); + HttpGetClient httpGetClient = new HttpGetClient(webClient); + when(responseSpec.bodyToMono(String.class)).thenReturn(Mono.just(DATA)); + + //when/then + StepVerifier.create(httpGetClient.callHttpGet(SOMEURL, JsonObject.class)).expectSubscription() + .expectNext(gson.fromJson(DATA, JsonObject.class)).verifyComplete(); + } + + @Test + void shouldReturnMonoErrorOnGetCall() { + //given + mockWebClientDependantObject(); + HttpGetClient httpGetClient = new HttpGetClient(webClient); + when(responseSpec.bodyToMono(String.class)).thenReturn(Mono.just("some wrong data")); + + //when/then + StepVerifier.create(httpGetClient.callHttpGet(SOMEURL, JsonObject.class)).expectSubscription() + .expectError(JsonSyntaxException.class).verify(); + } + + + private void mockWebClientDependantObject() { + doReturn(requestBodyUriSpec).when(webClient).get(); + when(requestBodyUriSpec.uri(SOMEURL)).thenReturn(requestBodyUriSpec); + doReturn(responseSpec).when(requestBodyUriSpec).retrieve(); + doReturn(responseSpec).when(responseSpec).onStatus(any(), any()); + doReturn(responseSpec).when(responseSpec).onStatus(any(), any()); + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java index c21c5988..e6818453 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java @@ -38,9 +38,9 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseExcept import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.collectors.datafile.service.FileData; -import org.onap.dcaegen2.collectors.datafile.service.ImmutableFileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage; import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField; |