diff options
author | wasala <przemyslaw.wasala@nokia.com> | 2018-09-19 12:55:18 +0200 |
---|---|---|
committer | wasala <przemyslaw.wasala@nokia.com> | 2018-09-20 12:50:32 +0200 |
commit | 3468d474187ef01546bdf1180d11453a4f924d31 (patch) | |
tree | 79aafcd9b4f7d36d27d0412f0ac3946c0ad79034 /datafile-app-server/src/main | |
parent | 2d1ea513ca0386102f2f9c11bd5c76f37939113a (diff) |
Loading configuration from consul/cbs
*Registered task which calling cbs/consul
for configuration - fixedRate 5 minutes
*Added workflow for loading config from cloud
Change-Id: Iba36d18b4ee0dca082612fa4c92c877f71c9b1fe
Issue-ID: DCAEGEN2-784
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
Diffstat (limited to 'datafile-app-server/src/main')
14 files changed, 572 insertions, 50 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java new file mode 100644 index 00000000..7570d704 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import com.google.gson.JsonObject; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +public class CloudConfigParser { + + private final JsonObject jsonObject; + + CloudConfigParser(JsonObject jsonObject) { + this.jsonObject = jsonObject; + } + + DmaapPublisherConfiguration getDmaapPublisherConfig() { + return new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) + .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) + .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString()) + .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) + .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) + .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) + .build(); + } + + DmaapConsumerConfiguration getDmaapConsumerConfig() { + return new ImmutableDmaapConsumerConfiguration.Builder() + .timeoutMS(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMS").getAsInt()) + .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) + .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString()) + .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString()) + .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt()) + .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString()) + .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt()) + .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) + .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) + .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) + .build(); + } +}
\ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java new file mode 100644 index 00000000..7bf711bc --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import com.google.gson.JsonObject; +import java.util.Optional; +import java.util.Properties; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; +import org.onap.dcaegen2.collectors.datafile.service.DatafileConfigurationProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.scheduling.annotation.EnableScheduling; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +@Configuration +@EnableConfigurationProperties +@EnableScheduling +@Primary +public class CloudConfiguration extends AppConfig { + + private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class); + + private DatafileConfigurationProvider datafileConfigurationProvider; + private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration; + private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration; + + @Value("#{systemEnvironment}") + private Properties systemEnvironment; + + @Autowired + public void setThreadPoolTaskScheduler(DatafileConfigurationProvider datafileConfigurationProvider) { + this.datafileConfigurationProvider = datafileConfigurationProvider; + } + + + protected void runTask() { + Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)) + .subscribeOn(Schedulers.parallel()) + .subscribe(this::parsingConfigSuccess, this::parsingConfigError); + } + + private void parsingConfigError(Throwable throwable) { + logger.warn("Error in case of processing system environment, more details below: ", throwable); + } + + private void cloudConfigError(Throwable throwable) { + logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); + } + + private void parsingConfigSuccess(EnvProperties envProperties) { + logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul"); + datafileConfigurationProvider.callForDataFileCollectorConfiguration(envProperties) + .subscribe(this::parseCloudConfig, this::cloudConfigError); + } + + private void parseCloudConfig(JsonObject jsonObject) { + logger.info("Received application configuration: {}", jsonObject); + CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); + dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); + dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); + } + + @Override + public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration()); + } + + @Override + public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration()); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java new file mode 100644 index 00000000..9f6b2737 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import java.util.Optional; +import java.util.Properties; +import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; +import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableEnvProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +class EnvironmentProcessor { + + private static final int DEFAULT_CONSUL_PORT = 8500; + private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); + + private EnvironmentProcessor() { + } + + static Mono<EnvProperties> evaluate(Properties systemEnvironment) { + logger.info("Loading configuration from system environment variables"); + EnvProperties envProperties; + try { + envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment)) + .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment)) + .appName(getService(systemEnvironment)).build(); + } catch (EnvironmentLoaderException e) { + return Mono.error(e); + } + logger.info("Evaluated environment system variables {}", envProperties); + return Mono.just(envProperties); + } + + private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { + return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST")) + .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined")); + } + + private static Integer getConsultPort(Properties systemEnvironments) { + return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf) + .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); + } + + private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException { + return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) + .orElseThrow( + () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined")); + } + + private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException { + return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME")) + .orElse(systemEnvironments.getProperty("SERVICE_NAME"))) + .orElseThrow(() -> new EnvironmentLoaderException( + "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment")); + } + + private static Integer getDefaultPortOfConsul() { + logger.warn("$CONSUL_PORT environment has not been defined"); + logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); + return DEFAULT_CONSUL_PORT; + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 1d0a192f..512a2178 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -16,6 +16,8 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; @@ -40,16 +42,20 @@ import reactor.core.publisher.Mono; @EnableScheduling public class SchedulerConfig extends DatafileAppConfig { - private static final int SCHEDULING_DELAY = 2000; + private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 10; + private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<ScheduledFuture>(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; + private final CloudConfiguration cloudConfiguration; @Autowired - public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask) { + public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask, + CloudConfiguration cloudConfiguration) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; + this.cloudConfiguration = cloudConfiguration; } /** @@ -62,7 +68,7 @@ public class SchedulerConfig extends DatafileAppConfig { scheduledFutureList.forEach(x -> x.cancel(false)); scheduledFutureList.clear(); return Mono.defer(() -> Mono - .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); + .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); } /** @@ -74,8 +80,11 @@ public class SchedulerConfig extends DatafileAppConfig { @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { if (scheduledFutureList.isEmpty()) { + scheduledFutureList.add(taskScheduler + .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), + Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask, - SCHEDULING_DELAY)); + Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS))); return true; } else { return false; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java new file mode 100644 index 00000000..75c2e8a8 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +public class EnvironmentLoaderException extends Exception { + + public EnvironmentLoaderException(String message) { + super(message); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java index 0f03b1a4..1e2dcc91 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java @@ -27,7 +27,7 @@ import java.util.List; import org.apache.commons.io.FilenameUtils; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java new file mode 100644 index 00000000..86549343 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import org.immutables.value.Value; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +@Value.Immutable(prehash = true) +public interface EnvProperties { + + @Value.Parameter + String consulHost(); + + @Value.Parameter + Integer consulPort(); + + @Value.Parameter + String cbsName(); + + @Value.Parameter + String appName(); + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java index 948976b6..48c4896f 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -16,22 +16,28 @@ * ============LICENSE_END======================================================================== */ -package org.onap.dcaegen2.collectors.datafile.service; +package org.onap.dcaegen2.collectors.datafile.model; +import org.immutables.gson.Gson; import org.immutables.value.Value; /** * Contains data, from the fileReady event, about the file to collect from the xNF. * * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a> - * */ @Value.Immutable +@Gson.TypeAdapters public interface FileData { - public String changeIdentifier(); - public String changeType(); - public String location(); - public String compression(); - public String fileFormatType(); - public String fileFormatVersion(); + String changeIdentifier(); + + String changeType(); + + String location(); + + String compression(); + + String fileFormatType(); + + String fileFormatVersion(); } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java new file mode 100644 index 00000000..aca87e5b --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java @@ -0,0 +1,102 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import org.springframework.web.util.DefaultUriBuilderFactory; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 9/19/18 + */ +@Service +public class DatafileConfigurationProvider { + + private static final Logger logger = LoggerFactory.getLogger(DatafileConfigurationProvider.class); + + private final HttpGetClient httpGetClient; + + DatafileConfigurationProvider() { + this(new HttpGetClient()); + } + + DatafileConfigurationProvider(HttpGetClient httpGetClient) { + this.httpGetClient = httpGetClient; + } + + public Mono<JsonObject> callForDataFileCollectorConfiguration(EnvProperties envProperties) { + return callConsulForConfigBindingServiceEndpoint(envProperties) + .flatMap(this::callConfigBindingServiceForDatafileConfiguration); + } + + private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) { + logger.info("Retrieving Config Binding Service endpoint from Consul"); + return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class) + .flatMap(jsonArray -> this.createConfigBindingServiceUrl(jsonArray, envProperties.appName())); + + } + + private String getConsulUrl(EnvProperties envProperties) { + return getUri(envProperties.consulHost(), envProperties.consulPort(), "/v1/catalog/service", + envProperties.cbsName()); + } + + private Mono<JsonObject> callConfigBindingServiceForDatafileConfiguration(String configBindingServiceUri) { + logger.info("Retrieving Datafile configuration"); + return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class); + } + + + private Mono<String> createConfigBindingServiceUrl(JsonArray jsonArray, String appName) { + return getConfigBindingObject(jsonArray) + .flatMap(jsonObject -> buildConfigBindingServiceUrl(jsonObject, appName)); + } + + private Mono<String> buildConfigBindingServiceUrl(JsonObject jsonObject, String appName) { + return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(), + jsonObject.get("ServicePort").getAsInt(), "/service_component", appName)); + } + + private Mono<JsonObject> getConfigBindingObject(JsonArray jsonArray) { + try { + if (jsonArray.size() > 0) { + return Mono.just(jsonArray.get(0).getAsJsonObject()); + } else { + throw new IllegalStateException("JSON Array was empty"); + } + } catch (IllegalStateException e) { + logger.warn("Failed to retrieve JSON Object from array", e); + return Mono.error(e); + } + } + + private String getUri(String host, Integer port, String... paths) { + return new DefaultUriBuilderFactory().builder() + .scheme("http") + .host(host) + .port(port) + .path(String.join("/", paths)) + .build().toString(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index c71d1435..72e7d497 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -30,6 +30,8 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseExcept import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; @@ -42,7 +44,7 @@ import reactor.core.publisher.Mono; */ public class DmaapConsumerJsonParser { - private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); + private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); private static final String EVENT = "event"; private static final String NOTIFICATION_FIELDS = "notificationFields"; @@ -59,8 +61,7 @@ public class DmaapConsumerJsonParser { private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; /** - * Extract info from string and create @see - * {@link org.onap.dcaegen2.collectors.datafile.service.FileData}. + * Extract info from string and create @see {@link FileData}. * * @param monoMessage - results from DMaaP * @return reactive Mono with an array of FileData @@ -71,17 +72,17 @@ public class DmaapConsumerJsonParser { private Mono<JsonElement> getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + : Mono.fromSupplier(() -> new JsonParser().parse(message)); } private Mono<List<FileData>> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) - : getFileDataFromJsonArray(jsonElement); + : getFileDataFromJsonArray(jsonElement); } private Mono<List<FileData>> getFileDataFromJsonArray(JsonElement jsonElement) { return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); + .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); } public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { @@ -90,8 +91,8 @@ public class DmaapConsumerJsonParser { private Mono<List<FileData>> create(Mono<JsonObject> jsonObject) { return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) - ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) - : transform(monoJsonP)); + ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + : transform(monoJsonP)); } private Mono<List<FileData>> transform(JsonObject jsonObject) { @@ -103,28 +104,28 @@ public class DmaapConsumerJsonParser { JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) - && arrayOfNamedHashMap != null) { + && arrayOfNamedHashMap != null) { Mono<List<FileData>> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); return res; } if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { return Mono.error( - new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); + new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); } else if (arrayOfNamedHashMap != null) { return Mono.error( - new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); + new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); } return Mono.error( - new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); + new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); } return Mono.error( - new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); + new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); } private Mono<List<FileData>> getAllFileDataFromJson(String changeIdentifier, String changeType, - JsonArray arrayOfAdditionalFields) { + JsonArray arrayOfAdditionalFields) { List<FileData> res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { if (arrayOfAdditionalFields.get(i) != null) { @@ -134,7 +135,7 @@ public class DmaapConsumerJsonParser { if (fileData != null) { res.add(fileData); } else { - LOGGER.error("Unable to collect file from xNF. File information wrong. " + fileInfo); + logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo); } } } @@ -152,10 +153,10 @@ public class DmaapConsumerJsonParser { String compression = getValueFromJson(data, COMPRESSION); if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) - && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { + && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType) - .location(location).compression(compression).fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion).build(); + .location(location).compression(compression).fileFormatType(fileFormatType) + .fileFormatVersion(fileFormatVersion).build(); } return fileData; } @@ -165,20 +166,20 @@ public class DmaapConsumerJsonParser { } private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType, - String notificationFieldsVersion) { + String notificationFieldsVersion) { return ((changeIdentifier != null && !changeIdentifier.isEmpty()) - && (changeType != null && !changeType.isEmpty()) - && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty())); + && (changeType != null && !changeType.isEmpty()) + && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty())); } private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) { return ((fileFormatVersion != null && !fileFormatVersion.isEmpty()) - && (fileFormatType != null && !fileFormatType.isEmpty())); + && (fileFormatType != null && !fileFormatType.isEmpty())); } private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) { return (name != null && !name.isEmpty()) && (location != null && !location.isEmpty()) - && (compression != null && !compression.isEmpty()); + && (compression != null && !compression.isEmpty()); } private boolean containsHeader(JsonObject jsonObject) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java new file mode 100644 index 00000000..796dbbd8 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + + +public class HttpGetClient { + + private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class); + + private final WebClient webClient; + private final Gson gson; + + HttpGetClient() { + this(WebClient.builder().filter(logRequest()).filter(logResponse()).build()); + } + + HttpGetClient(WebClient webClient) { + this.webClient = webClient; + this.gson = new Gson(); + } + + <T> Mono<T> callHttpGet(String url, Class<T> genericClassDeclaration) { + return webClient + .get() + .uri(url) + .retrieve() + .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response))) + .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response))) + .bodyToMono(String.class) + .flatMap(body -> getJsonFromRequest(body, genericClassDeclaration)); + } + + private RuntimeException getException(ClientResponse response) { + return new RuntimeException(String.format("Request for cloud config failed: HTTP %d", + response.statusCode().value())); + } + + private <T> Mono<T> getJsonFromRequest(String body, Class<T> genericClassDeclaration) { + try { + return Mono.just(parseJson(body, genericClassDeclaration)); + } catch (JsonSyntaxException | IllegalStateException e) { + return Mono.error(e); + } + } + + private <T> T parseJson(String body, Class<T> genericClassDeclaration) { + return gson.fromJson(body, genericClassDeclaration); + } + + private static ExchangeFilterFunction logResponse() { + return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { + logger.info("Response status {}", clientResponse.statusCode()); + return Mono.just(clientResponse); + }); + } + + private static ExchangeFilterFunction logRequest() { + return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { + logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + clientRequest.headers() + .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + return Mono.just(clientRequest); + }); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java index 30bf536e..0c76fc17 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java @@ -25,7 +25,7 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; -import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java index fdd1bb49..839e03c9 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java @@ -24,7 +24,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.Config; import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,8 +55,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { } protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, - DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) { + DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) { this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; @@ -79,7 +79,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { dmaaPConsumerReactiveHttpClient = resolveClient(); logger.trace("Method called with arg {}", object); Mono<List<FileData>> consumerResult = - consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); + consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); return consumerResult.flatMap(this::getFilesFromSender); } @@ -95,8 +95,6 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override protected DmaapConsumerReactiveHttpClient resolveClient() { - return dmaaPConsumerReactiveHttpClient == null - ? new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()) - : dmaaPConsumerReactiveHttpClient; + return new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java index 8c4d7072..5779051c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java @@ -49,15 +49,14 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) - throws DatafileTaskException { + public Mono<String> publish(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) { logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels); return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels); } @Override public Mono<String> execute(Mono<List<ConsumerDmaapModel>> consumerDmaapModels) - throws DatafileTaskException { + throws DatafileTaskException { if (consumerDmaapModels == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } @@ -73,8 +72,7 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { @Override DmaapProducerReactiveHttpClient resolveClient() { - return dmaapProducerReactiveHttpClient == null - ? new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()) - : dmaapProducerReactiveHttpClient; + return new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()); } + } |