diff options
author | wasala <przemyslaw.wasala@nokia.com> | 2018-08-21 14:08:05 +0200 |
---|---|---|
committer | Przemyslaw Wasala <przemyslaw.wasala@nokia.com> | 2018-08-21 12:51:51 +0000 |
commit | 76d0e26cf99bf932b9e1aac3f7a6bb231d8bffe7 (patch) | |
tree | 03976571591fb6145a5dcfb051ef0766989d1b77 | |
parent | e625d742db0cf04038081aac0758396eb3961977 (diff) |
Second part loading PRH CONF
*Priority for loading configuration
*Run asynchronus task in parallel which
is responsible for dynamic hot swaping
configuration from CONSUL/CBS
Change-Id: I03ca0458e34eb71404c5ee8263d4cd476e99290b
Issue-ID: DCAEGEN2-696
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
8 files changed, 231 insertions, 27 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java index 5fe7c60e..2357e1d2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java @@ -27,7 +27,6 @@ import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java index d83c8138..c5c77ec2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java @@ -21,7 +21,6 @@ package org.onap.dcaegen2.services.prh.configuration; import java.util.Optional; - import java.util.function.Predicate; import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; @@ -32,7 +31,6 @@ import org.onap.dcaegen2.services.prh.config.ImmutableDmaapPublisherConfiguratio import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; -import org.springframework.stereotype.Component; /** diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java new file mode 100644 index 00000000..808c4a54 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; +import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.prh.config.ImmutableAaiClientConfiguration; +import org.onap.dcaegen2.services.prh.config.ImmutableDmaapConsumerConfiguration; +import org.onap.dcaegen2.services.prh.config.ImmutableDmaapPublisherConfiguration; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/21/18 + */ +class CloudConfigParser { + + private final JsonObject jsonObject; + + CloudConfigParser(JsonObject jsonObject) { + this.jsonObject = jsonObject; + } + + DmaapPublisherConfiguration getDmaapPublisherConfig() { + return new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) + .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) + .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString()) + .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) + .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) + .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) + .build(); + } + + AaiClientConfiguration getAaiClientConfig() { + return new ImmutableAaiClientConfiguration.Builder() + .aaiHost(jsonObject.get("aai.aaiClientConfiguration.aaiHost").getAsString()) + .aaiPort(jsonObject.get("aai.aaiClientConfiguration.aaiHostPortNumber").getAsInt()) + .aaiUserName(jsonObject.get("aai.aaiClientConfiguration.aaiUserName").getAsString()) + .aaiPnfPath(jsonObject.get("aai.aaiClientConfiguration.aaiPnfPath").getAsString()) + .aaiIgnoreSslCertificateErrors( + jsonObject.get("aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors").getAsBoolean()) + .aaiUserPassword(jsonObject.get("aai.aaiClientConfiguration.aaiUserPassword").getAsString()) + .aaiProtocol(jsonObject.get("aai.aaiClientConfiguration.aaiProtocol").getAsString()) + .aaiBasePath(jsonObject.get("aai.aaiClientConfiguration.aaiBasePath").getAsString()) + .build(); + } + + DmaapConsumerConfiguration getDmaapConsumerConfig() { + return new ImmutableDmaapConsumerConfiguration.Builder() + .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMS").getAsInt()) + .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) + .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString()) + .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt()) + .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString()) + .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt()) + .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) + .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) + .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) + .build(); + } +}
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java index 5b5c038b..82017a9d 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java @@ -20,7 +20,12 @@ package org.onap.dcaegen2.services.prh.configuration; +import com.google.gson.JsonObject; +import java.util.Optional; import java.util.Properties; +import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration; +import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration; +import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.prh.model.EnvProperties; import org.onap.dcaegen2.services.prh.service.HttpClientExecutorService; import org.slf4j.Logger; @@ -46,6 +51,10 @@ public class CloudConfiguration extends AppConfig { private Logger logger = LoggerFactory.getLogger(this.getClass()); private HttpClientExecutorService httpClientExecutorService; + private AaiClientConfiguration aaiClientCloudConfiguration; + private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration; + private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration; + TaskScheduler cloudTaskScheduler; @Value("#{systemEnvironment}") @@ -62,17 +71,45 @@ public class CloudConfiguration extends AppConfig { protected void runTask() { Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)) .subscribeOn(Schedulers.parallel()) - .subscribe(this::doOnSucces, this::doOnError); + .subscribe(this::parsingConfigSuccess, this::parsingConfigError); + } + + private void parsingConfigError(Throwable throwable) { + logger.warn("Error in case of processing system environment, more details below: ", throwable); } - private void doOnError(Throwable throwable) { - logger.warn("Error in case of processing system environment.%nMore details below:%n ", throwable); + private void cloudConfigError(Throwable throwable) { + logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable); } - private void doOnSucces(EnvProperties envProperties) { + private void parsingConfigSuccess(EnvProperties envProperties) { logger.info("Fetching PRH configuration from ConfigBindingService/Consul"); Flux.just(httpClientExecutorService.callConsulForConfigBindingServiceEndpoint(envProperties)) - .flatMap(configBindingServiceUri -> httpClientExecutorService.callConfigBindingServiceForPrhConfiguration(envProperties, - configBindingServiceUri)).subscribe(); + .flatMap(configBindingServiceUri -> httpClientExecutorService + .callConfigBindingServiceForPrhConfiguration(envProperties, + configBindingServiceUri)).subscribe(this::parseCloudConfig, this::cloudConfigError); + } + + private void parseCloudConfig(JsonObject jsonObject) { + logger.info("Received application configuration: {}", jsonObject); + CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); + dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); + aaiClientCloudConfiguration = cloudConfigParser.getAaiClientConfig(); + dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); + } + + @Override + public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { + return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration()); + } + + @Override + public AaiClientConfiguration getAaiClientConfiguration() { + return Optional.ofNullable(aaiClientCloudConfiguration).orElse(super.getAaiClientConfiguration()); + } + + @Override + public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { + return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration()); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java index d1905108..d3b6cbb3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java @@ -34,13 +34,14 @@ import reactor.core.publisher.Flux; */ class EnvironmentProcessor { + private static final int DEFAULT_CONSUL_PORT = 8500; private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class); private EnvironmentProcessor() { } static Flux<EnvProperties> evaluate(Properties systemEnvironment) { - logger.info("Loading configuration from system environment variables"); + logger.info("Loading configuration from system environment variables {}", systemEnvironment); EnvProperties envProperties; try { envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment)) @@ -78,8 +79,8 @@ class EnvironmentProcessor { private static Integer getDefaultPortOfConsul() { logger.warn("$CONSUL_PORT environment has not been defined"); - logger.warn("$CONSUL_PORT variable will be set to default port {}", 8500); - return 8500; + logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT); + return DEFAULT_CONSUL_PORT; } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java index 1bce1c07..8782a180 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java @@ -26,7 +26,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks; import org.springframework.beans.factory.annotation.Autowired; @@ -34,10 +33,8 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; -import org.springframework.scheduling.support.PeriodicTrigger; import reactor.core.publisher.Mono; /** @@ -49,7 +46,7 @@ public class SchedulerConfig extends CloudConfiguration { private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 2000; private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 1; - private static volatile List<ScheduledFuture> scheduledPrgTaskFutureList = new ArrayList<>(); + private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>(); private final ConcurrentTaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; @@ -68,8 +65,8 @@ public class SchedulerConfig extends CloudConfiguration { */ @ApiOperation(value = "Get response on stopping task execution") public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() { - scheduledPrgTaskFutureList.forEach(x -> x.cancel(false)); - scheduledPrgTaskFutureList.clear(); + scheduledPrhTaskFutureList.forEach(x -> x.cancel(false)); + scheduledPrhTaskFutureList.clear(); return Mono.defer(() -> Mono.just(new ResponseEntity<>("PRH Service has already been stopped!", HttpStatus.CREATED)) ); @@ -84,11 +81,11 @@ public class SchedulerConfig extends CloudConfiguration { @PostConstruct @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { - if (scheduledPrgTaskFutureList.isEmpty()) { - scheduledPrgTaskFutureList.add(cloudTaskScheduler + if (scheduledPrhTaskFutureList.isEmpty()) { + scheduledPrhTaskFutureList.add(cloudTaskScheduler .scheduleAtFixedRate(super::runTask, Instant.now(), Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); - scheduledPrgTaskFutureList.add(taskScheduler + scheduledPrhTaskFutureList.add(taskScheduler .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask, SCHEDULING_DELAY_FOR_PRH_TASKS)); return true; } else { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java index 01081f4d..1b69f5fd 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java @@ -20,9 +20,19 @@ package org.onap.dcaegen2.services.prh.service; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; import org.onap.dcaegen2.services.prh.model.EnvProperties; import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18 @@ -31,17 +41,96 @@ import org.springframework.stereotype.Service; @Service public class HttpClientExecutorService { - public String callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) { - return null; + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + public Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) { + + return HttpGetClient.callHttpGet( + envProperties.consulHost() + ":" + envProperties.consulPort() + "/v1/catalog/service/" + envProperties + .cbsName()) + .flatMap(this::getJsonArrayFromRequest) + .flatMap(jsonArray -> Mono.just(jsonArray.get(0))) + .flatMap(this::createConfigBindingServiceURL); + + } + + public Publisher<JsonObject> callConfigBindingServiceForPrhConfiguration(EnvProperties envProperties, + Mono<String> configBindingServiceUri) { + return HttpGetClient.callHttpGet(configBindingServiceUri + "/service_component/" + envProperties.appName()) + .flatMap(this::getJsonConfiguration); + } + + private Mono<? extends JsonObject> getJsonConfiguration(String body) { + JsonElement jsonElement = new Gson().toJsonTree(body); + try { + return Mono.just(jsonElement.getAsJsonObject()); + } catch (IllegalStateException e) { + return Mono.error(e); + } + } + + private Mono<String> createConfigBindingServiceURL(JsonElement jsonElement) { + JsonObject jsonObject; + try { + jsonObject = jsonElement.getAsJsonObject(); + } catch (IllegalStateException e) { + return Mono.error(e); + } + return Mono.just(jsonObject.get("ServiceAddress").toString() + ":" + jsonObject.get("ServicePort").toString()); } - public Publisher<String> callConfigBindingServiceForPrhConfiguration(EnvProperties envProperties, - String configBindingServiceUri) { - return null; + private Mono<? extends JsonArray> getJsonArrayFromRequest(String body) { + JsonElement jsonElement = new Gson().toJsonTree(body); + try { + return Mono.just(jsonElement.getAsJsonArray()); + } catch (IllegalStateException e) { + logger.warn("Converting string to jsonArray threw error: " + e); + return Mono.error(e); + } } private static class HttpGetClient { + private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class); + + private static WebClient webClient; + + private HttpGetClient() { + } + + private static Mono<String> callHttpGet(String url) { + return webClient + .get() + .uri(url) + .retrieve() + .onStatus(HttpStatus::is4xxClientError, response -> + Mono.error(new Exception("Request for cloud config failed: HTTP 400"))) + .onStatus(HttpStatus::is5xxServerError, response -> + Mono.error(new Exception("Request for cloud config failed: HTTP 500"))) + .bodyToMono(String.class); + } + + private static ExchangeFilterFunction logResponse() { + return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> { + logger.info("Response status {}", clientResponse.statusCode()); + return Mono.just(clientResponse); + }); + } + + private static ExchangeFilterFunction logRequest() { + return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> { + logger.info("Request: {} {}", clientRequest.method(), clientRequest.url()); + clientRequest.headers() + .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value))); + return Mono.just(clientRequest); + }); + } + + static { + webClient = WebClient.builder().filter(logRequest()).filter(logResponse()).build(); + } + + } } diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java index 07388478..eb62d3cb 100644 --- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java +++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java @@ -71,7 +71,7 @@ public class DMaaPProducerReactiveHttpClient { .body(BodyInserters.fromObject(consumerDmaapModelMono)) .retrieve() .onStatus(HttpStatus::is4xxClientError, clientResponse -> - Mono.error(new Exception("HTTP 400")) + Mono.error(new Exception("HTTP 400")) ) .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500"))) |