diff options
Diffstat (limited to 'a1-policy-management/src/main/java')
3 files changed, 105 insertions, 86 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java index 8409f45c..7f453a27 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java @@ -188,7 +188,7 @@ public class AsyncRestClient { logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(), exception.getResponseBodyAsString()); } else { - logger.debug("{} HTTP error", traceTag, t); + logger.debug("{} HTTP error {}", traceTag, t.getMessage()); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java index 3a365178..f948e5f5 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java @@ -20,7 +20,6 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap; -import com.google.common.collect.Iterables; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; @@ -37,15 +36,17 @@ import java.util.ServiceLoader; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; -import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; + /** * The class fetches incoming requests from DMAAP. It uses the timeout parameter * that lets the MessageRouter keep the connection with the Kafka open until @@ -74,60 +75,82 @@ public class DmaapMessageConsumer { private final AsyncRestClientFactory restClientFactory; + private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); + @Value("${server.http-port}") private int localServerHttpPort; + private static class InfiniteFlux { + private FluxSink<Integer> sink; + private int counter = 0; + + public synchronized Flux<Integer> start() { + stop(); + return Flux.create(this::next).doOnRequest(this::onRequest); + } + + public synchronized void stop() { + if (this.sink != null) { + this.sink.complete(); + this.sink = null; + } + } + + void onRequest(long no) { + logger.debug("InfiniteFlux.onRequest {}", no); + for (long i = 0; i < no; ++i) { + sink.next(counter++); + } + } + + void next(FluxSink<Integer> sink) { + logger.debug("InfiniteFlux.next"); + this.sink = sink; + sink.next(counter++); + } + + } + @Autowired public DmaapMessageConsumer(ApplicationConfig applicationConfig) { this.applicationConfig = applicationConfig; GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - gson = gsonBuilder.create(); + this.gson = gsonBuilder.create(); this.restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); } /** - * Starts the consumer. If there is a DMaaP configuration, it will start polling - * for messages. Otherwise it will check regularly for the configuration. + * Starts the DMAAP consumer. If there is a DMaaP configuration, it will start + * polling for messages. Otherwise it will check regularly for the + * configuration. * - * @return the running thread, for test purposes. */ - public Thread start() { - Thread thread = new Thread(this::messageHandlingLoop); - thread.start(); - return thread; + public void start() { + infiniteSubmitter.stop(); + + createTask().subscribe(// + value -> logger.debug("DmaapMessageConsumer next: {}", value), // + throwable -> logger.error("DmaapMessageConsumer error: {}", throwable), // + () -> logger.warn("DmaapMessageConsumer stopped") // + ); } - private void messageHandlingLoop() { - while (!isStopped()) { - try { - if (isDmaapConfigured()) { - Iterable<DmaapRequestMessage> dmaapMsgs = fetchAllMessages(); - if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) { - logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); - for (DmaapRequestMessage msg : dmaapMsgs) { - processMsg(msg); - } - } - } else { - sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration - } - } catch (Exception e) { - logger.warn("{}", e.getMessage()); - sleep(TIME_BETWEEN_DMAAP_RETRIES); - } - } + protected Flux<String> createTask() { + return infiniteFlux() // + .flatMap(notUsed -> fetchFromDmaap(), 1) // + .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) // + .flatMap(this::parseReceivedMessage, 1)// + .flatMap(this::handleDmaapMsg, 1) // + .onErrorResume(throwable -> Mono.empty()); } - protected boolean isStopped() { - return false; + protected Flux<Integer> infiniteFlux() { + return infiniteSubmitter.start(); } - protected boolean isDmaapConfigured() { - String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl(); - String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl(); - return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty() - && !consumerTopicUrl.isEmpty()); + protected Mono<Object> delay() { + return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(o -> Mono.empty()); } private <T> List<T> parseList(String jsonString, Class<T> clazz) { @@ -146,7 +169,36 @@ public class DmaapMessageConsumer { return result; } - private void sendErrorResponse(String response) { + protected boolean isDmaapConfigured() { + String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl(); + String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl(); + return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty() + && !consumerTopicUrl.isEmpty()); + } + + protected Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) { + return getDmaapMessageHandler().handleDmaapMsg(dmaapRequestMessage); + } + + protected Mono<String> getFromMessageRouter(String topicUrl) { + logger.trace("getFromMessageRouter {}", topicUrl); + AsyncRestClient c = restClientFactory.createRestClient(""); + return c.get(topicUrl); + } + + protected Flux<DmaapRequestMessage> parseReceivedMessage(String jsonString) { + try { + logger.trace("parseMessages {}", jsonString); + return Flux.fromIterable(parseList(jsonString, DmaapRequestMessage.class)); + } catch (Exception e) { + logger.error("parseMessages error {}", jsonString); + return sendErrorResponse("Could not parse: " + jsonString) // + .flatMapMany(s -> Flux.empty()); + } + } + + protected Mono<String> sendErrorResponse(String response) { + logger.debug("sendErrorResponse {}", response); DmaapRequestMessage fakeRequest = ImmutableDmaapRequestMessage.builder() // .apiVersion("") // .correlationId("") // @@ -158,37 +210,23 @@ public class DmaapMessageConsumer { .timestamp("") // .url("URL") // .build(); - getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST).block(); + return getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST) // + .onErrorResume(e -> Mono.empty()); } - List<DmaapRequestMessage> parseMessages(String jsonString) throws ServiceException { - try { - return parseList(jsonString, DmaapRequestMessage.class); - } catch (Exception e) { - sendErrorResponse("Not parsable request received, reason:" + e.toString() + ", input :" + jsonString); - throw new ServiceException("Could not parse incomming request. Reason :" + e.getMessage()); + private Mono<String> fetchFromDmaap() { + if (!this.isDmaapConfigured()) { + logger.debug("fetchFromDmaap, no action DMAAP not configured"); + return delay().flatMap(o -> Mono.empty()); } - } - - protected Iterable<DmaapRequestMessage> fetchAllMessages() throws ServiceException { + logger.debug("fetchFromDmaap"); String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl(); - AsyncRestClient consumer = getMessageRouterConsumer(); - ResponseEntity<String> response = consumer.getForEntity(topicUrl).block(); - logger.debug("DMaaP consumer received {} : {}", response.getStatusCode(), response.getBody()); - if (response.getStatusCode().is2xxSuccessful()) { - return parseMessages(response.getBody()); - } else { - throw new ServiceException("Cannot fetch because of Error respons: " + response.getStatusCode().toString() - + " " + response.getBody()); - } - } - private void processMsg(DmaapRequestMessage msg) { - logger.debug("Message Reveived from DMAAP : {}", msg); - getDmaapMessageHandler().handleDmaapMsg(msg); + return getFromMessageRouter(topicUrl) // + .onErrorResume(throwable -> delay().flatMap(o -> Mono.empty())); } - protected DmaapMessageHandler getDmaapMessageHandler() { + private DmaapMessageHandler getDmaapMessageHandler() { if (this.dmaapMessageHandler == null) { String pmsBaseUrl = "http://localhost:" + this.localServerHttpPort; AsyncRestClient pmsClient = restClientFactory.createRestClient(pmsBaseUrl); @@ -199,16 +237,4 @@ public class DmaapMessageConsumer { return this.dmaapMessageHandler; } - protected void sleep(Duration duration) { - try { - Thread.sleep(duration.toMillis()); - } catch (Exception e) { - logger.error("Failed to put the thread to sleep", e); - } - } - - protected AsyncRestClient getMessageRouterConsumer() { - return restClientFactory.createRestClient(""); - } - } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java index 967cab1d..c77087a5 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java @@ -54,20 +54,13 @@ public class DmaapMessageHandler { this.dmaapClient = dmaapClient; } - public void handleDmaapMsg(DmaapRequestMessage msg) { - try { - String result = this.createTask(msg).block(); - logger.debug("handleDmaapMsg: {}", result); - } catch (Exception throwable) { - logger.warn("handleDmaapMsg failure {}", throwable.getMessage()); - } - } - - Mono<String> createTask(DmaapRequestMessage dmaapRequestMessage) { + public Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) { return this.invokePolicyManagementService(dmaapRequestMessage) // .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) // .flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, - response.getStatusCode())); + response.getStatusCode())) + .doOnError(t -> logger.warn("Failed to handle DMAAP message : {}", t.getMessage()))// + .onErrorResume(t -> Mono.empty()); } private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error, |