summaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/main/java
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2020-10-14 14:14:06 +0200
committerPatrikBuhr <patrik.buhr@est.tech>2020-10-19 08:34:58 +0200
commitf86407dfdf0e04979a6765da4eb13f9983e1150e (patch)
treed5a2fb87cfbbd062bf0e3d613cc8a0ee04e7b18d /a1-policy-management/src/main/java
parentdf373ec4c902a2596dd2dfe957425af1e3113b17 (diff)
Made DmaapMessageConsumer asynchronuous
Change-Id: Ib3d4951f3f9b2061353b5e50f427559a3781b10e Issue-ID: CCSDK-2502 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'a1-policy-management/src/main/java')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java2
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java174
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java15
3 files changed, 105 insertions, 86 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
index 8409f45c..7f453a27 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
@@ -188,7 +188,7 @@ public class AsyncRestClient {
logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(),
exception.getResponseBodyAsString());
} else {
- logger.debug("{} HTTP error", traceTag, t);
+ logger.debug("{} HTTP error {}", traceTag, t.getMessage());
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
index 3a365178..f948e5f5 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
@@ -20,7 +20,6 @@
package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
-import com.google.common.collect.Iterables;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
@@ -37,15 +36,17 @@ import java.util.ServiceLoader;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
-import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+
/**
* The class fetches incoming requests from DMAAP. It uses the timeout parameter
* that lets the MessageRouter keep the connection with the Kafka open until
@@ -74,60 +75,82 @@ public class DmaapMessageConsumer {
private final AsyncRestClientFactory restClientFactory;
+ private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
+
@Value("${server.http-port}")
private int localServerHttpPort;
+ private static class InfiniteFlux {
+ private FluxSink<Integer> sink;
+ private int counter = 0;
+
+ public synchronized Flux<Integer> start() {
+ stop();
+ return Flux.create(this::next).doOnRequest(this::onRequest);
+ }
+
+ public synchronized void stop() {
+ if (this.sink != null) {
+ this.sink.complete();
+ this.sink = null;
+ }
+ }
+
+ void onRequest(long no) {
+ logger.debug("InfiniteFlux.onRequest {}", no);
+ for (long i = 0; i < no; ++i) {
+ sink.next(counter++);
+ }
+ }
+
+ void next(FluxSink<Integer> sink) {
+ logger.debug("InfiniteFlux.next");
+ this.sink = sink;
+ sink.next(counter++);
+ }
+
+ }
+
@Autowired
public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
this.applicationConfig = applicationConfig;
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- gson = gsonBuilder.create();
+ this.gson = gsonBuilder.create();
this.restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
}
/**
- * Starts the consumer. If there is a DMaaP configuration, it will start polling
- * for messages. Otherwise it will check regularly for the configuration.
+ * Starts the DMAAP consumer. If there is a DMaaP configuration, it will start
+ * polling for messages. Otherwise it will check regularly for the
+ * configuration.
*
- * @return the running thread, for test purposes.
*/
- public Thread start() {
- Thread thread = new Thread(this::messageHandlingLoop);
- thread.start();
- return thread;
+ public void start() {
+ infiniteSubmitter.stop();
+
+ createTask().subscribe(//
+ value -> logger.debug("DmaapMessageConsumer next: {}", value), //
+ throwable -> logger.error("DmaapMessageConsumer error: {}", throwable), //
+ () -> logger.warn("DmaapMessageConsumer stopped") //
+ );
}
- private void messageHandlingLoop() {
- while (!isStopped()) {
- try {
- if (isDmaapConfigured()) {
- Iterable<DmaapRequestMessage> dmaapMsgs = fetchAllMessages();
- if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
- logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
- for (DmaapRequestMessage msg : dmaapMsgs) {
- processMsg(msg);
- }
- }
- } else {
- sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration
- }
- } catch (Exception e) {
- logger.warn("{}", e.getMessage());
- sleep(TIME_BETWEEN_DMAAP_RETRIES);
- }
- }
+ protected Flux<String> createTask() {
+ return infiniteFlux() //
+ .flatMap(notUsed -> fetchFromDmaap(), 1) //
+ .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
+ .flatMap(this::parseReceivedMessage, 1)//
+ .flatMap(this::handleDmaapMsg, 1) //
+ .onErrorResume(throwable -> Mono.empty());
}
- protected boolean isStopped() {
- return false;
+ protected Flux<Integer> infiniteFlux() {
+ return infiniteSubmitter.start();
}
- protected boolean isDmaapConfigured() {
- String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
- String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
- return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty()
- && !consumerTopicUrl.isEmpty());
+ protected Mono<Object> delay() {
+ return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(o -> Mono.empty());
}
private <T> List<T> parseList(String jsonString, Class<T> clazz) {
@@ -146,7 +169,36 @@ public class DmaapMessageConsumer {
return result;
}
- private void sendErrorResponse(String response) {
+ protected boolean isDmaapConfigured() {
+ String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
+ String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
+ return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty()
+ && !consumerTopicUrl.isEmpty());
+ }
+
+ protected Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) {
+ return getDmaapMessageHandler().handleDmaapMsg(dmaapRequestMessage);
+ }
+
+ protected Mono<String> getFromMessageRouter(String topicUrl) {
+ logger.trace("getFromMessageRouter {}", topicUrl);
+ AsyncRestClient c = restClientFactory.createRestClient("");
+ return c.get(topicUrl);
+ }
+
+ protected Flux<DmaapRequestMessage> parseReceivedMessage(String jsonString) {
+ try {
+ logger.trace("parseMessages {}", jsonString);
+ return Flux.fromIterable(parseList(jsonString, DmaapRequestMessage.class));
+ } catch (Exception e) {
+ logger.error("parseMessages error {}", jsonString);
+ return sendErrorResponse("Could not parse: " + jsonString) //
+ .flatMapMany(s -> Flux.empty());
+ }
+ }
+
+ protected Mono<String> sendErrorResponse(String response) {
+ logger.debug("sendErrorResponse {}", response);
DmaapRequestMessage fakeRequest = ImmutableDmaapRequestMessage.builder() //
.apiVersion("") //
.correlationId("") //
@@ -158,37 +210,23 @@ public class DmaapMessageConsumer {
.timestamp("") //
.url("URL") //
.build();
- getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST).block();
+ return getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST) //
+ .onErrorResume(e -> Mono.empty());
}
- List<DmaapRequestMessage> parseMessages(String jsonString) throws ServiceException {
- try {
- return parseList(jsonString, DmaapRequestMessage.class);
- } catch (Exception e) {
- sendErrorResponse("Not parsable request received, reason:" + e.toString() + ", input :" + jsonString);
- throw new ServiceException("Could not parse incomming request. Reason :" + e.getMessage());
+ private Mono<String> fetchFromDmaap() {
+ if (!this.isDmaapConfigured()) {
+ logger.debug("fetchFromDmaap, no action DMAAP not configured");
+ return delay().flatMap(o -> Mono.empty());
}
- }
-
- protected Iterable<DmaapRequestMessage> fetchAllMessages() throws ServiceException {
+ logger.debug("fetchFromDmaap");
String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
- AsyncRestClient consumer = getMessageRouterConsumer();
- ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
- logger.debug("DMaaP consumer received {} : {}", response.getStatusCode(), response.getBody());
- if (response.getStatusCode().is2xxSuccessful()) {
- return parseMessages(response.getBody());
- } else {
- throw new ServiceException("Cannot fetch because of Error respons: " + response.getStatusCode().toString()
- + " " + response.getBody());
- }
- }
- private void processMsg(DmaapRequestMessage msg) {
- logger.debug("Message Reveived from DMAAP : {}", msg);
- getDmaapMessageHandler().handleDmaapMsg(msg);
+ return getFromMessageRouter(topicUrl) //
+ .onErrorResume(throwable -> delay().flatMap(o -> Mono.empty()));
}
- protected DmaapMessageHandler getDmaapMessageHandler() {
+ private DmaapMessageHandler getDmaapMessageHandler() {
if (this.dmaapMessageHandler == null) {
String pmsBaseUrl = "http://localhost:" + this.localServerHttpPort;
AsyncRestClient pmsClient = restClientFactory.createRestClient(pmsBaseUrl);
@@ -199,16 +237,4 @@ public class DmaapMessageConsumer {
return this.dmaapMessageHandler;
}
- protected void sleep(Duration duration) {
- try {
- Thread.sleep(duration.toMillis());
- } catch (Exception e) {
- logger.error("Failed to put the thread to sleep", e);
- }
- }
-
- protected AsyncRestClient getMessageRouterConsumer() {
- return restClientFactory.createRestClient("");
- }
-
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
index 967cab1d..c77087a5 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
@@ -54,20 +54,13 @@ public class DmaapMessageHandler {
this.dmaapClient = dmaapClient;
}
- public void handleDmaapMsg(DmaapRequestMessage msg) {
- try {
- String result = this.createTask(msg).block();
- logger.debug("handleDmaapMsg: {}", result);
- } catch (Exception throwable) {
- logger.warn("handleDmaapMsg failure {}", throwable.getMessage());
- }
- }
-
- Mono<String> createTask(DmaapRequestMessage dmaapRequestMessage) {
+ public Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) {
return this.invokePolicyManagementService(dmaapRequestMessage) //
.onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) //
.flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage,
- response.getStatusCode()));
+ response.getStatusCode()))
+ .doOnError(t -> logger.warn("Failed to handle DMAAP message : {}", t.getMessage()))//
+ .onErrorResume(t -> Mono.empty());
}
private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error,