diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh')
7 files changed, 110 insertions, 159 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java index 3445c076..84d9fcd2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java @@ -28,6 +28,7 @@ import org.slf4j.MDC; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; @@ -42,6 +43,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVari @SpringBootApplication(exclude = {JacksonAutoConfiguration.class}) @Configuration @EnableScheduling +@EnableConfigurationProperties public class MainApp { public static void main(String[] args) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsClientConfigFileReader.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsClientConfigFileReader.java deleted file mode 100644 index f481f4ca..00000000 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsClientConfigFileReader.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * PNF-REGISTRATION-HANDLER - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.services.prh.configuration; - -import com.google.gson.Gson; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; - -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.io.Resource; -import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; - -@Component -public class CbsClientConfigFileReader { - - private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientConfigFileReader.class); - - private final Resource cbsClientConfigFile; - - public CbsClientConfigFileReader(@Value("classpath:cbs_client_config.json") Resource cbsClientConfigFile) { - this.cbsClientConfigFile = cbsClientConfigFile; - } - - public Mono<CbsClientConfiguration> readConfig() { - LOGGER.debug("Loading CBS client configuration from configuration file"); - try (InputStream inputStream = cbsClientConfigFile.getInputStream()) { - CbsClientConfiguration config = new Gson().fromJson( - new InputStreamReader(inputStream, StandardCharsets.UTF_8), ImmutableCbsClientConfiguration.class); - LOGGER.info("Evaluated variables: {}", config); - return Mono.just(config); - } catch (Exception e) { - return Mono.error(new RuntimeException("Failed to load/parse CBS client configuration file", e)); - } - } - -} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsClientConfigurationResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsClientConfigurationResolver.java deleted file mode 100644 index ce4cd4ed..00000000 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsClientConfigurationResolver.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * PNF-REGISTRATION-HANDLER - * ================================================================================ - * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.services.prh.configuration; - -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; - -@Component -public class CbsClientConfigurationResolver { - - private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientConfigurationResolver.class); - private final CbsClientConfigFileReader cbsClientConfigFileReader; - - public CbsClientConfigurationResolver(CbsClientConfigFileReader cbsClientConfigFileReader) { - this.cbsClientConfigFileReader = cbsClientConfigFileReader; - } - - Mono<CbsClientConfiguration> resolveCbsClientConfiguration() { - return Mono.fromSupplier(CbsClientConfiguration::fromEnvironment) - .doOnError(err -> LOGGER.warn("Failed resolving CBS client configuration from system environments", err)) - .onErrorResume(err -> cbsClientConfigFileReader.readConfig()); - } - -} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigRefreshScheduler.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigRefreshScheduler.java new file mode 100644 index 00000000..fc4b9dff --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigRefreshScheduler.java @@ -0,0 +1,97 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.context.environment.EnvironmentChangeEvent; +import org.springframework.cloud.context.refresh.ContextRefresher; +import org.springframework.context.event.EventListener; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.time.Duration; + +@Component +public class CbsConfigRefreshScheduler { + + private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfigRefreshScheduler.class); + private static final String CBS_UPDATES_INTERVAL_PROPERTY = "cbs.updates-interval"; + private static final Duration NO_UPDATES = Duration.ZERO; + + private final ContextRefresher contextRefresher; + private final Environment environment; + private final Scheduler scheduler; + private transient Disposable refreshEventsStreamHandle; + + + public CbsConfigRefreshScheduler(ContextRefresher contextRefresher, Environment environment) { + this.contextRefresher = contextRefresher; + this.environment = environment; + this.scheduler = Schedulers.newElastic("conf-updates"); + } + + @PostConstruct + public void startPollingForCbsUpdates() { + startPollingForCbsUpdates(getCbsUpdatesInterval()); + } + + private void startPollingForCbsUpdates(Duration updatesInterval) { + if (!updatesInterval.equals(NO_UPDATES)) { + LOGGER.info("Configuring pulling for CBS updates in every {}", updatesInterval); + refreshEventsStreamHandle = Flux.interval(updatesInterval, scheduler) + .doOnNext(i -> { + LOGGER.debug("Requesting context refresh"); + contextRefresher.refresh(); + }) + .onErrorContinue((e, o) -> LOGGER.error("Failed fetching config updates from CBS", e)) + .subscribe(); + } + } + + @EventListener + public void onEnvironmentChanged(EnvironmentChangeEvent event) { + if (event.getKeys().contains(CBS_UPDATES_INTERVAL_PROPERTY)) { + LOGGER.info("CBS config polling interval changed to {}", environment.getProperty(CBS_UPDATES_INTERVAL_PROPERTY)); + stopPollingForCbsUpdates(); + startPollingForCbsUpdates(getCbsUpdatesInterval()); + } + } + + private Duration getCbsUpdatesInterval() { + return environment.getProperty(CBS_UPDATES_INTERVAL_PROPERTY, Duration.class, NO_UPDATES); + } + + @PreDestroy + private void stopPollingForCbsUpdates() { + if(refreshEventsStreamHandle != null) { + LOGGER.debug("Stopping pulling for CBS updates"); + refreshEventsStreamHandle.dispose(); + } + } + +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java index 1f752733..c1226359 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java @@ -22,24 +22,17 @@ package org.onap.dcaegen2.services.prh.configuration; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.context.annotation.Configuration; -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Schedulers; import java.util.Optional; -@Configuration + public class CbsConfiguration implements Config { private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfiguration.class); private static final String CBS_CONFIG_MISSING = "CBS config missing"; @@ -50,26 +43,8 @@ public class CbsConfiguration implements Config { private MessageRouterSubscribeRequest messageRouterCBSSubscribeRequest; private MessageRouterPublishRequest messageRouterCBSUpdatePublishRequest; - private final CbsClientConfigurationResolver cbsClientConfigurationResolver; - - public CbsConfiguration(CbsClientConfigurationResolver cbsClientConfigurationResolver) { - this.cbsClientConfigurationResolver = cbsClientConfigurationResolver; - } - - public void runTask() { - Flux.defer(cbsClientConfigurationResolver::resolveCbsClientConfiguration) - .subscribeOn(Schedulers.parallel()) - .subscribe(this::parsingConfigSuccess, this::parsingConfigError); - } - private void parsingConfigSuccess(CbsClientConfiguration cbsClientConfiguration) { - LOGGER.debug("Fetching PRH configuration from Consul"); - CbsClientFactory.createCbsClient(cbsClientConfiguration) - .flatMap(cbsClient -> cbsClient.get(CbsRequests.getAll(RequestDiagnosticContext.create()))) - .subscribe(this::parseCBSConfig, this::cbsConfigError); - } - - private void parseCBSConfig(JsonObject jsonObject) { + public void parseCBSConfig(JsonObject jsonObject) { LOGGER.info("Received application configuration: {}", jsonObject); CbsContentParser consulConfigurationParser = new CbsContentParser(jsonObject); @@ -85,14 +60,6 @@ public class CbsConfiguration implements Config { messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest(); } - private void parsingConfigError(Throwable throwable) { - LOGGER.warn("Failed to process system environments", throwable); - } - - private void cbsConfigError(Throwable throwable) { - LOGGER.warn("Failed to gather configuration from ConfigBindingService/Consul", throwable); - } - @Override public MessageRouterPublisher getMessageRouterPublisher() { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java index 71d707bc..31794f6b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java @@ -23,26 +23,28 @@ package org.onap.dcaegen2.services.prh.configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Configuration; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; import org.springframework.core.io.Resource; +import org.springframework.stereotype.Component; import org.springframework.util.StreamUtils; -import javax.annotation.PostConstruct; import java.io.IOException; import java.nio.charset.Charset; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 */ -@Configuration +@Component public class PrhAppConfig { private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class); @Value("classpath:git_info.json") private Resource gitInfo; - @PostConstruct - private void printGitInfo() throws IOException { + + @EventListener + public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) throws IOException { if(LOGGER.isDebugEnabled()) { LOGGER.debug("Git info={}", StreamUtils.copyToString(gitInfo.getInputStream(), Charset.defaultCharset())); } @@ -51,4 +53,5 @@ public class PrhAppConfig { public Resource getGitInfo() { return gitInfo; } + } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java index bc13ddc4..c3eaa12f 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java @@ -21,18 +21,15 @@ package org.onap.dcaegen2.services.prh.tasks; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; import javax.annotation.PostConstruct; -import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; @@ -44,24 +41,17 @@ import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling public class ScheduledTasksRunner { - private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 10; - private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksRunner.class); private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; - private final CbsConfiguration cbsConfiguration; - @Autowired - public ScheduledTasksRunner(TaskScheduler taskScheduler, - ScheduledTasks scheduledTask, - CbsConfiguration cbsConfiguration) { + public ScheduledTasksRunner(TaskScheduler taskScheduler, ScheduledTasks scheduledTask) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; - this.cbsConfiguration = cbsConfiguration; } /** @@ -82,9 +72,6 @@ public class ScheduledTasksRunner { LOGGER.info(ENTRY, "Start scheduling PRH workflow"); if (scheduledPrhTaskFutureList.isEmpty()) { scheduledPrhTaskFutureList.add(taskScheduler - .scheduleAtFixedRate(cbsConfiguration::runTask, Instant.now(), - Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); - scheduledPrhTaskFutureList.add(taskScheduler .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask, Duration.ofSeconds(SCHEDULING_DELAY_FOR_PRH_TASKS))); return true; |