diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java | 55 |
1 files changed, 33 insertions, 22 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 6432a338..f74bc56a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -25,7 +25,8 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE import java.util.Map; import java.util.UUID; -import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; @@ -33,12 +34,10 @@ import org.onap.dcaegen2.services.prh.model.logging.MDCVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -46,7 +45,8 @@ import reactor.core.scheduler.Schedulers; @Component public class ScheduledTasks { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapProducerTask; private final AaiProducerTask aaiProducerTask; @@ -72,24 +72,33 @@ public class ScheduledTasks { */ public void scheduleMainPrhEventTask() { MDCVariables.setMdcContextMap(contextMap); - logger.trace("Execution of tasks was registered"); - - Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage()) - .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP")) - .map(this::publishToAaiConfiguration) - .flatMap(this::publishToDmaapConfiguration) - .subscribeOn(Schedulers.elastic()); + try { + logger.trace("Execution of tasks was registered"); + CountDownLatch mainCountDownLatch = new CountDownLatch(1); + consumeFromDMaaPMessage() + .doOnError(DmaapEmptyResponseException.class, error -> + logger.warn("Nothing to consume from DMaaP") + ) + .flatMap(this::publishToAaiConfiguration) + .flatMap(this::publishToDmaapConfiguration) + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onComplete); - dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete); + mainCountDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } + private void onComplete() { logger.info("PRH tasks have been completed"); } - private void onSuccess(String responseCode) { - MDC.put(RESPONSE_CODE, responseCode); - logger.info("Prh consumed tasks. HTTP Response code {}", responseCode); + private void onSuccess(ResponseEntity<String> responseCode) { + MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); + logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", + responseCode.getStatusCode().value()); } private void onError(Throwable throwable) { @@ -98,24 +107,26 @@ public class ScheduledTasks { } } - private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() { - return () -> { + + private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() { + return Mono.defer(() -> { MDCVariables.setMdcContextMap(contextMap); MDC.put(INSTANCE_UUID, UUID.randomUUID().toString()); + logger.info("Init configs"); dmaapConsumerTask.initConfigs(); return dmaapConsumerTask.execute(""); - }; + }); } - private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) { + private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) { try { return aaiProducerTask.execute(monoDMaaPModel); - } catch (PrhTaskException e) { + } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } - private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) { + private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { try { return dmaapProducerTask.execute(monoAaiModel); } catch (PrhTaskException e) { |