aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
diff options
context:
space:
mode:
authorpwielebs <piotr.wielebski@nokia.com>2018-09-04 09:29:49 +0200
committerpwielebs <piotr.wielebski@nokia.com>2018-09-04 09:29:49 +0200
commit83df6e1df5ec20627c85af9ba2f49036dd58f328 (patch)
treede9282995bc4c7b0d0f277760b1d6f3574970794 /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
parent3c2766d8a64d21f402b5234e33419a8aed14d7ea (diff)
Refatoring due to prh workflow
1. Added specified HttpClient for DmaaPPublisher: *DmaaP Handle transfer-encoding: chunk header and reject the request if it will be set by the client. In conclusion no other reactive http client can be used for pushing something to dmaap. 2. Added sll support to A&AI rective webclient. *Behaviour of reactive A&AI HttpClient is different as in native spring have without it. 3. Added 10s fixed time in PRH for requesting DmaaP. 4. Added debug log in reactive/native http clients. 5. Fixed reactive workflow of prh. 6. Updated the version of: * spring-boot-dependencies:2.0.1.RELEASE->2.0.4.RELEASE * spring-boot-starter-reactor-netty:2.0.2.RELEASE->2.0.4.RELEASE * spring-webflux:5.0.5.RELEASE->5.0.8.RELEASE * reactor-bom:Bismuth-RELEASE->Bismuth-SR10 Change-Id: I815ffb5bdcf48d94f3b7c64040a73e98e404a5e8 Issue-ID: DCAEGEN2-743 Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java55
1 files changed, 33 insertions, 22 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 6432a338..f74bc56a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -25,7 +25,8 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
@@ -33,12 +34,10 @@ import org.onap.dcaegen2.services.prh.model.logging.MDCVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -46,7 +45,8 @@ import reactor.core.scheduler.Schedulers;
@Component
public class ScheduledTasks {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
private final DmaapConsumerTask dmaapConsumerTask;
private final DmaapPublisherTask dmaapProducerTask;
private final AaiProducerTask aaiProducerTask;
@@ -72,24 +72,33 @@ public class ScheduledTasks {
*/
public void scheduleMainPrhEventTask() {
MDCVariables.setMdcContextMap(contextMap);
- logger.trace("Execution of tasks was registered");
-
- Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
- .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
- .map(this::publishToAaiConfiguration)
- .flatMap(this::publishToDmaapConfiguration)
- .subscribeOn(Schedulers.elastic());
+ try {
+ logger.trace("Execution of tasks was registered");
+ CountDownLatch mainCountDownLatch = new CountDownLatch(1);
+ consumeFromDMaaPMessage()
+ .doOnError(DmaapEmptyResponseException.class, error ->
+ logger.warn("Nothing to consume from DMaaP")
+ )
+ .flatMap(this::publishToAaiConfiguration)
+ .flatMap(this::publishToDmaapConfiguration)
+ .doOnTerminate(mainCountDownLatch::countDown)
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
- dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+ mainCountDownLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
+
private void onComplete() {
logger.info("PRH tasks have been completed");
}
- private void onSuccess(String responseCode) {
- MDC.put(RESPONSE_CODE, responseCode);
- logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
+ private void onSuccess(ResponseEntity<String> responseCode) {
+ MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
+ responseCode.getStatusCode().value());
}
private void onError(Throwable throwable) {
@@ -98,24 +107,26 @@ public class ScheduledTasks {
}
}
- private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
- return () -> {
+
+ private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+ return Mono.defer(() -> {
MDCVariables.setMdcContextMap(contextMap);
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
+ logger.info("Init configs");
dmaapConsumerTask.initConfigs();
return dmaapConsumerTask.execute("");
- };
+ });
}
- private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+ private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) {
try {
return aaiProducerTask.execute(monoDMaaPModel);
- } catch (PrhTaskException e) {
+ } catch (PrhTaskException | SSLException e) {
return Mono.error(e);
}
}
- private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) {
+ private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
try {
return dmaapProducerTask.execute(monoAaiModel);
} catch (PrhTaskException e) {