From 3468d474187ef01546bdf1180d11453a4f924d31 Mon Sep 17 00:00:00 2001 From: wasala Date: Wed, 19 Sep 2018 12:55:18 +0200 Subject: Loading configuration from consul/cbs *Registered task which calling cbs/consul for configuration - fixedRate 5 minutes *Added workflow for loading config from cloud Change-Id: Iba36d18b4ee0dca082612fa4c92c877f71c9b1fe Issue-ID: DCAEGEN2-784 Signed-off-by: wasala --- .../datafile/configuration/CloudConfigParser.java | 65 +++++++++++++ .../datafile/configuration/CloudConfiguration.java | 99 ++++++++++++++++++++ .../configuration/EnvironmentProcessor.java | 83 +++++++++++++++++ .../datafile/configuration/SchedulerConfig.java | 17 +++- .../exceptions/EnvironmentLoaderException.java | 29 ++++++ .../collectors/datafile/ftp/FileCollector.java | 2 +- .../collectors/datafile/model/EnvProperties.java | 41 +++++++++ .../collectors/datafile/model/FileData.java | 43 +++++++++ .../service/DatafileConfigurationProvider.java | 102 +++++++++++++++++++++ .../datafile/service/DmaapConsumerJsonParser.java | 47 +++++----- .../collectors/datafile/service/FileData.java | 37 -------- .../collectors/datafile/service/HttpGetClient.java | 91 ++++++++++++++++++ .../datafile/tasks/DmaapConsumerTask.java | 2 +- .../datafile/tasks/DmaapConsumerTaskImpl.java | 12 +-- .../datafile/tasks/DmaapPublisherTaskImpl.java | 10 +- 15 files changed, 601 insertions(+), 79 deletions(-) create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java delete mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java create mode 100644 datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java (limited to 'datafile-app-server/src/main/java/org/onap') diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java new file mode 100644 index 00000000..7570d704 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import com.google.gson.JsonObject; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.ImmutableDmaapPublisherConfiguration; + +/** + * @author Przemysław Wąsala on 9/19/18 + */ +public class CloudConfigParser { + + private final JsonObject jsonObject; + + CloudConfigParser(JsonObject jsonObject) { + this.jsonObject = jsonObject; + } + + DmaapPublisherConfiguration getDmaapPublisherConfig() { + return new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) + .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) + .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString()) + .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) + .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) + .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) + .build(); + } + + DmaapConsumerConfiguration getDmaapConsumerConfig() { + return new ImmutableDmaapConsumerConfiguration.Builder() + .timeoutMS(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMS").getAsInt()) + .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) + .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString()) + .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString()) + .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt()) + .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString()) + .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt()) + .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) + .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) + .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) + .build(); + } +} \ No newline at end of file diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java new file mode 100644 index 00000000..7bf711bc --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfiguration.java @@ -0,0 +1,99 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import com.google.gson.JsonObject; +import java.util.Optional; +import java.util.Properties; +import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; +import org.onap.dcaegen2.collectors.datafile.service.DatafileConfigurationProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.scheduling.annotation.EnableScheduling; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +/** + * @author Przemysław Wąsala on 9/19/18 + */ +@Configuration +@EnableConfigurationProperties +@EnableScheduling +@Primary +public class CloudConfiguration extends AppConfig { + + private static final Logger logger = LoggerFactory.getLogger(CloudConfiguration.class); + + private DatafileConfigurationProvider datafileConfigurationProvider; + private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration; + private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration; + + @Value("#{systemEnvironment}") + private Properties systemEnvironment; + + @Autowired + public void setThreadPoolTaskScheduler(DatafileConfigurationProvider datafileConfigurationProvider) { + this.datafileConfigurationProvider = datafileConfigurationProvider; + } + + + protected void runTask() { + Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)) + .subscribeOn(Schedulers.parallel()) + .subscribe(this::parsingConfigSuccess, this::parsingConfigError); + } + + private void parsingConfigError(Throwable throwable) { + logger.warn("Error in case of processing system environment, more details below: ", throwable); + } + + private void cloudConfigError(Throwable throwable) { + logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); + } + + private void parsingConfigSuccess(EnvProperties envProperties) { + logger.info("Fetching Datafile Collector configuration from ConfigBindingService/Consul"); + datafileConfigurationProvider.callForDataFileCollectorConfiguration(envProperties) + .subscribe(this::parseCloudConfig, this::cloudConfigError); + } + + private void parseCloudConfig(JsonObject jsonObject) { + logger.info("Received application configuration: {}", jsonObject); + CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); + dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); + dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); + } + + @Override + public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration()); + } + + @Override + public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration()); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java new file mode 100644 index 00000000..9f6b2737 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/EnvironmentProcessor.java @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.configuration; + +import java.util.Optional; +import java.util.Properties; +import org.onap.dcaegen2.collectors.datafile.exceptions.EnvironmentLoaderException; +import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableEnvProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +/** + * @author Przemysław Wąsala on 9/19/18 + */ +class EnvironmentProcessor { + + private static final int DEFAULT_CONSUL_PORT = 8500; + private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); + + private EnvironmentProcessor() { + } + + static Mono evaluate(Properties systemEnvironment) { + logger.info("Loading configuration from system environment variables"); + EnvProperties envProperties; + try { + envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment)) + .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment)) + .appName(getService(systemEnvironment)).build(); + } catch (EnvironmentLoaderException e) { + return Mono.error(e); + } + logger.info("Evaluated environment system variables {}", envProperties); + return Mono.just(envProperties); + } + + private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { + return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST")) + .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined")); + } + + private static Integer getConsultPort(Properties systemEnvironments) { + return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf) + .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); + } + + private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException { + return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) + .orElseThrow( + () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined")); + } + + private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException { + return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME")) + .orElse(systemEnvironments.getProperty("SERVICE_NAME"))) + .orElseThrow(() -> new EnvironmentLoaderException( + "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment")); + } + + private static Integer getDefaultPortOfConsul() { + logger.warn("$CONSUL_PORT environment has not been defined"); + logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); + return DEFAULT_CONSUL_PORT; + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java index 1d0a192f..512a2178 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java @@ -16,6 +16,8 @@ package org.onap.dcaegen2.collectors.datafile.configuration; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; @@ -40,16 +42,20 @@ import reactor.core.publisher.Mono; @EnableScheduling public class SchedulerConfig extends DatafileAppConfig { - private static final int SCHEDULING_DELAY = 2000; + private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 10; + private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; private static volatile List scheduledFutureList = new ArrayList(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; + private final CloudConfiguration cloudConfiguration; @Autowired - public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask) { + public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask, + CloudConfiguration cloudConfiguration) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; + this.cloudConfiguration = cloudConfiguration; } /** @@ -62,7 +68,7 @@ public class SchedulerConfig extends DatafileAppConfig { scheduledFutureList.forEach(x -> x.cancel(false)); scheduledFutureList.clear(); return Mono.defer(() -> Mono - .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); + .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED))); } /** @@ -74,8 +80,11 @@ public class SchedulerConfig extends DatafileAppConfig { @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { if (scheduledFutureList.isEmpty()) { + scheduledFutureList.add(taskScheduler + .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), + Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(scheduledTask::scheduleMainDatafileEventTask, - SCHEDULING_DELAY)); + Duration.ofSeconds(SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS))); return true; } else { return false; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java new file mode 100644 index 00000000..75c2e8a8 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/EnvironmentLoaderException.java @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.exceptions; + +/** + * @author Przemysław Wąsala on 9/19/18 + */ +public class EnvironmentLoaderException extends Exception { + + public EnvironmentLoaderException(String message) { + super(message); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java index 0f03b1a4..1e2dcc91 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollector.java @@ -27,7 +27,7 @@ import java.util.List; import org.apache.commons.io.FilenameUtils; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel; -import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java new file mode 100644 index 00000000..86549343 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/EnvProperties.java @@ -0,0 +1,41 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import org.immutables.value.Value; + +/** + * @author Przemysław Wąsala on 9/19/18 + */ +@Value.Immutable(prehash = true) +public interface EnvProperties { + + @Value.Parameter + String consulHost(); + + @Value.Parameter + Integer consulPort(); + + @Value.Parameter + String cbsName(); + + @Value.Parameter + String appName(); + +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java new file mode 100644 index 00000000..48c4896f --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java @@ -0,0 +1,43 @@ +/* + * ============LICENSE_START====================================================================== + * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. + * =============================================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END======================================================================== + */ + +package org.onap.dcaegen2.collectors.datafile.model; + +import org.immutables.gson.Gson; +import org.immutables.value.Value; + +/** + * Contains data, from the fileReady event, about the file to collect from the xNF. + * + * @author Henrik Andersson + */ +@Value.Immutable +@Gson.TypeAdapters +public interface FileData { + String changeIdentifier(); + + String changeType(); + + String location(); + + String compression(); + + String fileFormatType(); + + String fileFormatVersion(); +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java new file mode 100644 index 00000000..aca87e5b --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DatafileConfigurationProvider.java @@ -0,0 +1,102 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.collectors.datafile.model.EnvProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import org.springframework.web.util.DefaultUriBuilderFactory; +import reactor.core.publisher.Mono; + +/** + * @author Przemysław Wąsala on 9/19/18 + */ +@Service +public class DatafileConfigurationProvider { + + private static final Logger logger = LoggerFactory.getLogger(DatafileConfigurationProvider.class); + + private final HttpGetClient httpGetClient; + + DatafileConfigurationProvider() { + this(new HttpGetClient()); + } + + DatafileConfigurationProvider(HttpGetClient httpGetClient) { + this.httpGetClient = httpGetClient; + } + + public Mono callForDataFileCollectorConfiguration(EnvProperties envProperties) { + return callConsulForConfigBindingServiceEndpoint(envProperties) + .flatMap(this::callConfigBindingServiceForDatafileConfiguration); + } + + private Mono callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) { + logger.info("Retrieving Config Binding Service endpoint from Consul"); + return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class) + .flatMap(jsonArray -> this.createConfigBindingServiceUrl(jsonArray, envProperties.appName())); + + } + + private String getConsulUrl(EnvProperties envProperties) { + return getUri(envProperties.consulHost(), envProperties.consulPort(), "/v1/catalog/service", + envProperties.cbsName()); + } + + private Mono callConfigBindingServiceForDatafileConfiguration(String configBindingServiceUri) { + logger.info("Retrieving Datafile configuration"); + return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class); + } + + + private Mono createConfigBindingServiceUrl(JsonArray jsonArray, String appName) { + return getConfigBindingObject(jsonArray) + .flatMap(jsonObject -> buildConfigBindingServiceUrl(jsonObject, appName)); + } + + private Mono buildConfigBindingServiceUrl(JsonObject jsonObject, String appName) { + return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(), + jsonObject.get("ServicePort").getAsInt(), "/service_component", appName)); + } + + private Mono getConfigBindingObject(JsonArray jsonArray) { + try { + if (jsonArray.size() > 0) { + return Mono.just(jsonArray.get(0).getAsJsonObject()); + } else { + throw new IllegalStateException("JSON Array was empty"); + } + } catch (IllegalStateException e) { + logger.warn("Failed to retrieve JSON Object from array", e); + return Mono.error(e); + } + } + + private String getUri(String host, Integer port, String... paths) { + return new DefaultUriBuilderFactory().builder() + .scheme("http") + .host(host) + .port(port) + .path(String.join("/", paths)) + .build().toString(); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java index c71d1435..72e7d497 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java @@ -30,6 +30,8 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseExcept import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.onap.dcaegen2.collectors.datafile.model.FileData; +import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; @@ -42,7 +44,7 @@ import reactor.core.publisher.Mono; */ public class DmaapConsumerJsonParser { - private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); + private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); private static final String EVENT = "event"; private static final String NOTIFICATION_FIELDS = "notificationFields"; @@ -59,8 +61,7 @@ public class DmaapConsumerJsonParser { private static final String FILE_FORMAT_VERSION = "fileFormatVersion"; /** - * Extract info from string and create @see - * {@link org.onap.dcaegen2.collectors.datafile.service.FileData}. + * Extract info from string and create @see {@link FileData}. * * @param monoMessage - results from DMaaP * @return reactive Mono with an array of FileData @@ -71,17 +72,17 @@ public class DmaapConsumerJsonParser { private Mono getJsonParserMessage(String message) { return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException()) - : Mono.fromSupplier(() -> new JsonParser().parse(message)); + : Mono.fromSupplier(() -> new JsonParser().parse(message)); } private Mono> createJsonConsumerModel(JsonElement jsonElement) { return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject)) - : getFileDataFromJsonArray(jsonElement); + : getFileDataFromJsonArray(jsonElement); } private Mono> getFileDataFromJsonArray(JsonElement jsonElement) { return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); + .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new))); } public Optional getJsonObjectFromAnArray(JsonElement element) { @@ -90,8 +91,8 @@ public class DmaapConsumerJsonParser { private Mono> create(Mono jsonObject) { return jsonObject.flatMap(monoJsonP -> !containsHeader(monoJsonP) - ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) - : transform(monoJsonP)); + ? Mono.error(new DmaapNotFoundException("Incorrect JsonObject - missing header")) + : transform(monoJsonP)); } private Mono> transform(JsonObject jsonObject) { @@ -103,28 +104,28 @@ public class DmaapConsumerJsonParser { JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP); if (isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion) - && arrayOfNamedHashMap != null) { + && arrayOfNamedHashMap != null) { Mono> res = getAllFileDataFromJson(changeIdentifier, changeType, arrayOfNamedHashMap); return res; } if (!isNotificationFieldsHeaderNotEmpty(changeIdentifier, changeType, notificationFieldsVersion)) { return Mono.error( - new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); + new DmaapNotFoundException("FileReady event header is missing information. " + jsonObject)); } else if (arrayOfNamedHashMap != null) { return Mono.error( - new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); + new DmaapNotFoundException("FileReady event arrayOfNamedHashMap is missing. " + jsonObject)); } return Mono.error( - new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); + new DmaapNotFoundException("FileReady event does not contain correct information. " + jsonObject)); } return Mono.error( - new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); + new DmaapNotFoundException("FileReady event has incorrect JsonObject - missing header. " + jsonObject)); } private Mono> getAllFileDataFromJson(String changeIdentifier, String changeType, - JsonArray arrayOfAdditionalFields) { + JsonArray arrayOfAdditionalFields) { List res = new ArrayList<>(); for (int i = 0; i < arrayOfAdditionalFields.size(); i++) { if (arrayOfAdditionalFields.get(i) != null) { @@ -134,7 +135,7 @@ public class DmaapConsumerJsonParser { if (fileData != null) { res.add(fileData); } else { - LOGGER.error("Unable to collect file from xNF. File information wrong. " + fileInfo); + logger.error("Unable to collect file from xNF. File information wrong. " + fileInfo); } } } @@ -152,10 +153,10 @@ public class DmaapConsumerJsonParser { String compression = getValueFromJson(data, COMPRESSION); if (isFileFormatFieldsNotEmpty(fileFormatVersion, fileFormatType) - && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { + && isNameAndLocationAndCompressionNotEmpty(name, location, compression)) { fileData = ImmutableFileData.builder().changeIdentifier(changeIdentifier).changeType(changeType) - .location(location).compression(compression).fileFormatType(fileFormatType) - .fileFormatVersion(fileFormatVersion).build(); + .location(location).compression(compression).fileFormatType(fileFormatType) + .fileFormatVersion(fileFormatVersion).build(); } return fileData; } @@ -165,20 +166,20 @@ public class DmaapConsumerJsonParser { } private boolean isNotificationFieldsHeaderNotEmpty(String changeIdentifier, String changeType, - String notificationFieldsVersion) { + String notificationFieldsVersion) { return ((changeIdentifier != null && !changeIdentifier.isEmpty()) - && (changeType != null && !changeType.isEmpty()) - && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty())); + && (changeType != null && !changeType.isEmpty()) + && (notificationFieldsVersion != null && !notificationFieldsVersion.isEmpty())); } private boolean isFileFormatFieldsNotEmpty(String fileFormatVersion, String fileFormatType) { return ((fileFormatVersion != null && !fileFormatVersion.isEmpty()) - && (fileFormatType != null && !fileFormatType.isEmpty())); + && (fileFormatType != null && !fileFormatType.isEmpty())); } private boolean isNameAndLocationAndCompressionNotEmpty(String name, String location, String compression) { return (name != null && !name.isEmpty()) && (location != null && !location.isEmpty()) - && (compression != null && !compression.isEmpty()); + && (compression != null && !compression.isEmpty()); } private boolean containsHeader(JsonObject jsonObject) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java deleted file mode 100644 index 948976b6..00000000 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/FileData.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved. - * =============================================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END======================================================================== - */ - -package org.onap.dcaegen2.collectors.datafile.service; - -import org.immutables.value.Value; - -/** - * Contains data, from the fileReady event, about the file to collect from the xNF. - * - * @author Henrik Andersson - * - */ -@Value.Immutable -public interface FileData { - public String changeIdentifier(); - public String changeType(); - public String location(); - public String compression(); - public String fileFormatType(); - public String fileFormatVersion(); -} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java new file mode 100644 index 00000000..796dbbd8 --- /dev/null +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpGetClient.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.service; + +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + + +public class HttpGetClient { + + private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class); + + private final WebClient webClient; + private final Gson gson; + + HttpGetClient() { + this(WebClient.builder().filter(logRequest()).filter(logResponse()).build()); + } + + HttpGetClient(WebClient webClient) { + this.webClient = webClient; + this.gson = new Gson(); + } + + Mono callHttpGet(String url, Class genericClassDeclaration) { + return webClient + .get() + .uri(url) + .retrieve() + .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response))) + .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response))) + .bodyToMono(String.class) + .flatMap(body -> getJsonFromRequest(body, genericClassDeclaration)); + } + + private RuntimeException getException(ClientResponse response) { + return new RuntimeException(String.format("Request for cloud config failed: HTTP %d", + response.statusCode().value())); + } + + private Mono getJsonFromRequest(String body, Class genericClassDeclaration) { + try { + return Mono.just(parseJson(body, genericClassDeclaration)); + } catch (JsonSyntaxException | IllegalStateException e) { + return Mono.error(e); + } + } + + private T parseJson(String body, Class genericClassDeclaration) { + return gson.fromJson(body, genericClassDeclaration); + } + + private static ExchangeFilterFunction logResponse() { + return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { + logger.info("Response status {}", clientResponse.statusCode()); + return Mono.just(clientResponse); + }); + } + + private static ExchangeFilterFunction logRequest() { + return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { + logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + clientRequest.headers() + .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + return Mono.just(clientRequest); + }); + } +} diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java index 30bf536e..0c76fc17 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java @@ -25,7 +25,7 @@ import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient; -import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.springframework.web.reactive.function.client.WebClient; diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java index fdd1bb49..839e03c9 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java @@ -24,7 +24,7 @@ import org.onap.dcaegen2.collectors.datafile.configuration.Config; import org.onap.dcaegen2.collectors.datafile.ftp.FileCollector; import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel; import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.collectors.datafile.service.FileData; +import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.service.consumer.DmaapConsumerReactiveHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,8 +55,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { } protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig, - DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, - DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) { + DmaapConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient, + DmaapConsumerJsonParser dmaapConsumerJsonParser, FileCollector fileCollector) { this.datafileAppConfig = datafileAppConfig; this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; @@ -79,7 +79,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { dmaaPConsumerReactiveHttpClient = resolveClient(); logger.trace("Method called with arg {}", object); Mono> consumerResult = - consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); + consume((dmaaPConsumerReactiveHttpClient.getDmaapConsumerResponse())); return consumerResult.flatMap(this::getFilesFromSender); } @@ -95,8 +95,6 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask { @Override protected DmaapConsumerReactiveHttpClient resolveClient() { - return dmaaPConsumerReactiveHttpClient == null - ? new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()) - : dmaaPConsumerReactiveHttpClient; + return new DmaapConsumerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java index 8c4d7072..5779051c 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java @@ -49,15 +49,14 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { } @Override - public Mono publish(Mono> consumerDmaapModels) - throws DatafileTaskException { + public Mono publish(Mono> consumerDmaapModels) { logger.info("Publishing on DMaaP DataRouter {}", consumerDmaapModels); return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModels); } @Override public Mono execute(Mono> consumerDmaapModels) - throws DatafileTaskException { + throws DatafileTaskException { if (consumerDmaapModels == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } @@ -73,8 +72,7 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask { @Override DmaapProducerReactiveHttpClient resolveClient() { - return dmaapProducerReactiveHttpClient == null - ? new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()) - : dmaapProducerReactiveHttpClient; + return new DmaapProducerReactiveHttpClient(resolveConfiguration()).createDmaapWebClient(buildWebClient()); } + } -- cgit 1.2.3-korg