diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt | 23 |
1 files changed, 12 insertions, 11 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt index 5b0dca2d..f9fd6986 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt @@ -27,12 +27,14 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog +import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -89,16 +91,15 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie .retryWhen(retry) - private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> { - try { - val sinks = DataStreams.namedSinks(configuration) - .filter { it.type() == "kafka" } - return sinks.map(streamParser::unsafeParse).asSequence() - - } catch (e: NullPointerException) { - throw ParsingException("Failed to parse configuration", e) - } - } + private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> = + try { + DataStreams.namedSinks(configuration) + .filter(streamOfType(KAFKA)) + .map(streamParser::unsafeParse) + .asSequence() + } catch (e: NullPointerException) { + throw ParsingException("Failed to parse configuration", e) + } companion object { private const val MAX_RETRIES = 5L |