diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks')
5 files changed, 20 insertions, 29 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java index eb5b51a1..02691446 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java @@ -75,7 +75,6 @@ public class BbsActionsTaskImpl implements BbsActionsTask { } public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) { - config.initFileStreamReader(); JsonObject additionalFields = consumerDmaapModel.getAdditionalFields(); if (additionalFields == null || !additionalFields.has(ATTACHMENT_POINT)) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index e3ea8962..3a630a40 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -33,8 +33,6 @@ import reactor.core.publisher.Flux; */ interface DmaapConsumerTask { - void initConfigs(); - Flux<ConsumerDmaapModel> execute(String object) throws SSLException; DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException; diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index 0d71477c..d3086cbe 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -63,11 +63,6 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask { } @Override - public void initConfigs() { - config.initFileStreamReader(); - } - - @Override public Flux<ConsumerDmaapModel> execute(String object) throws SSLException { DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); LOGGER.debug("Method called with arg {}", object); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index aae5bc77..72ec0cac 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -48,7 +48,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE @Component public class ScheduledTasks { - private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class); private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); private final DmaapConsumerTask dmaapConsumerTask; private final DmaapPublisherTask dmaapReadyProducerTask; @@ -100,36 +100,36 @@ public class ScheduledTasks { public void scheduleMainPrhEventTask() { MdcVariables.setMdcContextMap(mdcContextMap); try { - logger.trace("Execution of tasks was registered"); + LOGGER.trace("Execution of tasks was registered"); CountDownLatch mainCountDownLatch = new CountDownLatch(1); consumeFromDMaaPMessage() .doOnError(DmaapEmptyResponseException.class, error -> - logger.warn("Nothing to consume from DMaaP") + LOGGER.warn("Nothing to consume from DMaaP") ) .flatMap(this::queryAaiForConfiguration) .flatMap(this::publishToAaiConfiguration) .doOnError(exception -> - logger.warn("AAIProducerTask exception has been registered: ", exception)) + LOGGER.warn("AAIProducerTask exception has been registered: ", exception)) .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) .flatMap(this::processAdditionalFields) .doOnError(exception -> - logger.warn("BBSActionsTask exception has been registered: ", exception)) + LOGGER.warn("BBSActionsTask exception has been registered: ", exception)) .flatMap(this::publishToDmaapConfigurationWithApache) .doOnError(exception -> - logger.warn("DMaaPProducerTask exception has been registered: ", exception)) + LOGGER.warn("DMaaPProducerTask exception has been registered: ", exception)) .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) .doOnTerminate(mainCountDownLatch::countDown) .subscribe(this::onSuccess, this::onError, this::onComplete); mainCountDownLatch.await(); } catch (InterruptedException e) { - logger.warn("Interruption problem on countDownLatch ", e); + LOGGER.warn("Interruption problem on countDownLatch ", e); Thread.currentThread().interrupt(); } } private void onComplete() { - logger.info("PRH tasks have been completed"); + LOGGER.info("PRH tasks have been completed"); } /** @@ -139,7 +139,7 @@ public class ScheduledTasks { private void onSuccess(HttpClientResponse response) { String statusCode = Integer.toString(response.status().code()); MDC.put(RESPONSE_CODE, statusCode); - logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", + LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCode); MDC.remove(RESPONSE_CODE); } @@ -147,7 +147,7 @@ public class ScheduledTasks { private void onSuccess(HttpResponse response) { String statusCode = Integer.toString(response.getStatusLine().getStatusCode()); MDC.put(RESPONSE_CODE, statusCode); - logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", + LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCode); MDC.remove(RESPONSE_CODE); } @@ -155,7 +155,7 @@ public class ScheduledTasks { private void onError(Throwable throwable) { if (!(throwable instanceof DmaapEmptyResponseException)) { - logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); + LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); } } @@ -163,8 +163,7 @@ public class ScheduledTasks { return Flux.defer(() -> { MdcVariables.setMdcContextMap(mdcContextMap); MDC.put(INSTANCE_UUID, UUID.randomUUID().toString()); - logger.info(INVOKE, "Init configs"); - dmaapConsumerTask.initConfigs(); + LOGGER.info(INVOKE, "Init configs"); return consumeFromDMaaP(); }); } @@ -197,7 +196,7 @@ public class ScheduledTasks { private Mono<State> processAdditionalFields(final State state) { if (state.ActivationStatus) { - logger.debug("Re-registration - Logical links won't be updated."); + LOGGER.debug("Re-registration - Logical links won't be updated."); return Mono.just(state); } @@ -213,7 +212,7 @@ public class ScheduledTasks { publishToDmaapConfiguration(final State state) { try { if (state.ActivationStatus) { - logger.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); + LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); return dmaapUpdateProducerTask.execute(state.DmaapModel); } @@ -227,7 +226,7 @@ public class ScheduledTasks { publishToDmaapConfigurationWithApache(final State state) { try { if (state.ActivationStatus) { - logger.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); + LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); return dmaapUpdateProducerTask.executeWithApache(state.DmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java index e20e25d8..e9c18109 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java @@ -28,7 +28,7 @@ import java.util.List; import java.util.concurrent.ScheduledFuture; import javax.annotation.PostConstruct; -import org.onap.dcaegen2.services.prh.configuration.ConsulConfiguration; +import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -57,15 +57,15 @@ public class ScheduledTasksRunner { private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; - private final ConsulConfiguration consulConfiguration; + private final CbsConfiguration cbsConfiguration; @Autowired public ScheduledTasksRunner(TaskScheduler taskScheduler, ScheduledTasks scheduledTask, - ConsulConfiguration consulConfiguration) { + CbsConfiguration cbsConfiguration) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; - this.consulConfiguration = consulConfiguration; + this.cbsConfiguration = cbsConfiguration; } /** @@ -94,7 +94,7 @@ public class ScheduledTasksRunner { LOGGER.info(ENTRY, "Start scheduling PRH workflow"); if (scheduledPrhTaskFutureList.isEmpty()) { scheduledPrhTaskFutureList.add(taskScheduler - .scheduleAtFixedRate(consulConfiguration::runTask, Instant.now(), + .scheduleAtFixedRate(cbsConfiguration::runTask, Instant.now(), Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); scheduledPrhTaskFutureList.add(taskScheduler .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask, |