aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java1
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java2
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java31
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java10
5 files changed, 20 insertions, 29 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
index eb5b51a1..02691446 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
@@ -75,7 +75,6 @@ public class BbsActionsTaskImpl implements BbsActionsTask {
}
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) {
- config.initFileStreamReader();
JsonObject additionalFields = consumerDmaapModel.getAdditionalFields();
if (additionalFields == null || !additionalFields.has(ATTACHMENT_POINT)) {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index e3ea8962..3a630a40 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -33,8 +33,6 @@ import reactor.core.publisher.Flux;
*/
interface DmaapConsumerTask {
- void initConfigs();
-
Flux<ConsumerDmaapModel> execute(String object) throws SSLException;
DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException;
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 0d71477c..d3086cbe 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -63,11 +63,6 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
}
@Override
- public void initConfigs() {
- config.initFileStreamReader();
- }
-
- @Override
public Flux<ConsumerDmaapModel> execute(String object) throws SSLException {
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
LOGGER.debug("Method called with arg {}", object);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index aae5bc77..72ec0cac 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -48,7 +48,7 @@ import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE
@Component
public class ScheduledTasks {
- private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class);
private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
private final DmaapConsumerTask dmaapConsumerTask;
private final DmaapPublisherTask dmaapReadyProducerTask;
@@ -100,36 +100,36 @@ public class ScheduledTasks {
public void scheduleMainPrhEventTask() {
MdcVariables.setMdcContextMap(mdcContextMap);
try {
- logger.trace("Execution of tasks was registered");
+ LOGGER.trace("Execution of tasks was registered");
CountDownLatch mainCountDownLatch = new CountDownLatch(1);
consumeFromDMaaPMessage()
.doOnError(DmaapEmptyResponseException.class, error ->
- logger.warn("Nothing to consume from DMaaP")
+ LOGGER.warn("Nothing to consume from DMaaP")
)
.flatMap(this::queryAaiForConfiguration)
.flatMap(this::publishToAaiConfiguration)
.doOnError(exception ->
- logger.warn("AAIProducerTask exception has been registered: ", exception))
+ LOGGER.warn("AAIProducerTask exception has been registered: ", exception))
.onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
.flatMap(this::processAdditionalFields)
.doOnError(exception ->
- logger.warn("BBSActionsTask exception has been registered: ", exception))
+ LOGGER.warn("BBSActionsTask exception has been registered: ", exception))
.flatMap(this::publishToDmaapConfigurationWithApache)
.doOnError(exception ->
- logger.warn("DMaaPProducerTask exception has been registered: ", exception))
+ LOGGER.warn("DMaaPProducerTask exception has been registered: ", exception))
.onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
.doOnTerminate(mainCountDownLatch::countDown)
.subscribe(this::onSuccess, this::onError, this::onComplete);
mainCountDownLatch.await();
} catch (InterruptedException e) {
- logger.warn("Interruption problem on countDownLatch ", e);
+ LOGGER.warn("Interruption problem on countDownLatch ", e);
Thread.currentThread().interrupt();
}
}
private void onComplete() {
- logger.info("PRH tasks have been completed");
+ LOGGER.info("PRH tasks have been completed");
}
/**
@@ -139,7 +139,7 @@ public class ScheduledTasks {
private void onSuccess(HttpClientResponse response) {
String statusCode = Integer.toString(response.status().code());
MDC.put(RESPONSE_CODE, statusCode);
- logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
+ LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
statusCode);
MDC.remove(RESPONSE_CODE);
}
@@ -147,7 +147,7 @@ public class ScheduledTasks {
private void onSuccess(HttpResponse response) {
String statusCode = Integer.toString(response.getStatusLine().getStatusCode());
MDC.put(RESPONSE_CODE, statusCode);
- logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
+ LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
statusCode);
MDC.remove(RESPONSE_CODE);
}
@@ -155,7 +155,7 @@ public class ScheduledTasks {
private void onError(Throwable throwable) {
if (!(throwable instanceof DmaapEmptyResponseException)) {
- logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
+ LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
}
}
@@ -163,8 +163,7 @@ public class ScheduledTasks {
return Flux.defer(() -> {
MdcVariables.setMdcContextMap(mdcContextMap);
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
- logger.info(INVOKE, "Init configs");
- dmaapConsumerTask.initConfigs();
+ LOGGER.info(INVOKE, "Init configs");
return consumeFromDMaaP();
});
}
@@ -197,7 +196,7 @@ public class ScheduledTasks {
private Mono<State> processAdditionalFields(final State state) {
if (state.ActivationStatus) {
- logger.debug("Re-registration - Logical links won't be updated.");
+ LOGGER.debug("Re-registration - Logical links won't be updated.");
return Mono.just(state);
}
@@ -213,7 +212,7 @@ public class ScheduledTasks {
publishToDmaapConfiguration(final State state) {
try {
if (state.ActivationStatus) {
- logger.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
+ LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
return dmaapUpdateProducerTask.execute(state.DmaapModel);
}
@@ -227,7 +226,7 @@ public class ScheduledTasks {
publishToDmaapConfigurationWithApache(final State state) {
try {
if (state.ActivationStatus) {
- logger.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
+ LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
return dmaapUpdateProducerTask.executeWithApache(state.DmaapModel);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
index e20e25d8..e9c18109 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
@@ -28,7 +28,7 @@ import java.util.List;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.PostConstruct;
-import org.onap.dcaegen2.services.prh.configuration.ConsulConfiguration;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
@@ -57,15 +57,15 @@ public class ScheduledTasksRunner {
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
- private final ConsulConfiguration consulConfiguration;
+ private final CbsConfiguration cbsConfiguration;
@Autowired
public ScheduledTasksRunner(TaskScheduler taskScheduler,
ScheduledTasks scheduledTask,
- ConsulConfiguration consulConfiguration) {
+ CbsConfiguration cbsConfiguration) {
this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTask;
- this.consulConfiguration = consulConfiguration;
+ this.cbsConfiguration = cbsConfiguration;
}
/**
@@ -94,7 +94,7 @@ public class ScheduledTasksRunner {
LOGGER.info(ENTRY, "Start scheduling PRH workflow");
if (scheduledPrhTaskFutureList.isEmpty()) {
scheduledPrhTaskFutureList.add(taskScheduler
- .scheduleAtFixedRate(consulConfiguration::runTask, Instant.now(),
+ .scheduleAtFixedRate(cbsConfiguration::runTask, Instant.now(),
Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
scheduledPrhTaskFutureList.add(taskScheduler
.scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask,