diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks')
4 files changed, 23 insertions, 45 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java index 066983ae..0780e18e 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,21 +22,14 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import com.google.gson.JsonElement; - -import java.util.Optional; - import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; -import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration; import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage; import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; /** * Component used to get messages from the MessageRouter. @@ -46,18 +40,14 @@ public class DMaaPMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumer.class); private final AppConfig datafileAppConfig; private final JsonMessageParser jsonMessageParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; public DMaaPMessageConsumer(AppConfig datafileAppConfig) { - this(datafileAppConfig, new JsonMessageParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); + this(datafileAppConfig, new JsonMessageParser()); } - protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser, - ConsumerReactiveHttpClientFactory httpClientFactory) { + protected DMaaPMessageConsumer(AppConfig datafileAppConfig, JsonMessageParser jsonMessageParser) { this.datafileAppConfig = datafileAppConfig; this.jsonMessageParser = jsonMessageParser; - this.httpClientFactory = httpClientFactory; } /** @@ -68,21 +58,20 @@ public class DMaaPMessageConsumer { public Flux<FileReadyMessage> getMessageRouterResponse() { logger.trace("getMessageRouterResponse called"); try { - DMaaPConsumerReactiveHttpClient client = createHttpClient(); - return consume((client.getDMaaPConsumerResponse(Optional.empty()))); - } catch (DatafileTaskException e) { + ConsumerConfiguration dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration(); + MessageRouterSubscriber messageRouterSubscriber = + dmaapConsumerConfiguration.getMessageRouterSubscriber(); + Flux<JsonElement> responseElements = + messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()); + return consume(responseElements); + } catch (Exception e) { logger.warn("Unable to get response from message router", e); return Flux.empty(); } } - private Flux<FileReadyMessage> consume(Mono<JsonElement> message) { - logger.trace("consume called with arg {}", message); - return jsonMessageParser.getMessagesFromJson(message); - } - - public DMaaPConsumerReactiveHttpClient createHttpClient() throws DatafileTaskException { - return httpClientFactory.create(datafileAppConfig.getDmaapConsumerConfiguration().toDmaap()); + private Flux<FileReadyMessage> consume(Flux<JsonElement> messages) { + return jsonMessageParser.getMessagesFromJson(messages); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index cfaf1753..8b86440a 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. + * Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +25,6 @@ import com.google.gson.JsonElement; import com.google.gson.JsonParser; import java.io.File; -import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; @@ -42,7 +42,6 @@ import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; import org.onap.dcaegen2.collectors.datafile.service.HttpUtils; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -113,7 +112,7 @@ public class DataRouterPublisher { private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException { put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); - JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); + JsonElement metaData = JsonParser.parseString(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); put.addHeader(X_DMAAP_DR_META, metaData.toString()); URI uri = new DefaultUriBuilderFactory( datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) // @@ -155,12 +154,8 @@ public class DataRouterPublisher { } DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException { - try { - DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap(); - return new DmaapProducerHttpClient(cfg); - } catch (MalformedURLException e) { - throw new DatafileTaskException("Cannot resolve producer client", e); - } + PublisherConfiguration publisherConfiguration = resolveConfiguration(changeIdentifier); + return new DmaapProducerHttpClient(publisherConfiguration); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java index 037803bd..a9973cf4 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Nordix Foundation. +* Copyright (C) 2020 Nokia. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +22,6 @@ package org.onap.dcaegen2.collectors.datafile.tasks; import java.io.InputStream; -import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; @@ -113,12 +113,7 @@ public class PublishedChecker { return appConfig.getPublisherConfiguration(changeIdentifier); } - protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) - throws DatafileTaskException { - try { - return new DmaapProducerHttpClient(publisherConfig.toDmaap()); - } catch (MalformedURLException e) { - throw new DatafileTaskException("Cannot create published checker client", e); - } + protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) { + return new DmaapProducerHttpClient(publisherConfig); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index 42a6fea3..eba0a6cb 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -1,6 +1,6 @@ /*- * ============LICENSE_START====================================================================== - * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. + * Copyright (C) 2018, 2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved. * =============================================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -302,8 +302,7 @@ public class ScheduledTasks { private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception, Map<String, String> context) { MDC.setContextMap(context); - logger.error("Polling for file ready message failed, exception: {}, config: {}", exception.toString(), - this.applicationConfiguration.getDmaapConsumerConfiguration()); + logger.error("Polling for file ready message failed, exception: {}", exception.toString()); return Flux.empty(); } |