summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt18
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt8
2 files changed, 20 insertions, 6 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 059e8028..22d8000e 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.main
import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule
+import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -41,10 +42,25 @@ private val hvVesServer = AtomicReference<ServerHandle>()
private val configurationModule = ConfigurationModule()
fun main(args: Array<String>) {
+ val configStateListener = object : ConfigurationStateListener {
+ override fun retrying() {
+ HealthState.INSTANCE.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+ }
+ }
+
HealthCheckServer.start(configurationModule.healthCheckPort(args))
configurationModule
- .hvVesConfigurationUpdates(args)
+ .hvVesConfigurationUpdates(args, configStateListener, ServiceContext::mdc)
.publishOn(Schedulers.single(Schedulers.elastic()))
+ .doOnNext {
+ logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
+ HealthState.INSTANCE.changeState(HealthDescription.HEALTHY)
+ }
+ .doOnError {
+ logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
+ logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
+ HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
+ }
.doOnNext(::startServer)
.doOnError(::logServerStartFailed)
.neverComplete() // TODO: remove after merging configuration stream with cbs
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index aed4d928..c079cc59 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -23,8 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.factory.AdapterFactory
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
@@ -59,11 +58,10 @@ object VesServer {
private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
CollectorFactory(
- AdapterFactory.configurationProvider(config.cbs),
+ config.collector,
AdapterFactory.sinkCreatorFactory(),
MicrometerMetrics.INSTANCE,
- config.server.maxPayloadSizeBytes,
- HealthState.INSTANCE
+ config.server.maxPayloadSizeBytes
)
private fun logServerStarted(handle: ServerHandle) =