diff options
Diffstat (limited to 'a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java')
-rw-r--r-- | a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java | 235 |
1 files changed, 0 insertions, 235 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java deleted file mode 100644 index 94888c38..00000000 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java +++ /dev/null @@ -1,235 +0,0 @@ -/*- - * ========================LICENSE_START================================= - * ONAP : ccsdk oran - * ====================================================================== - * Copyright (C) 2020 Nordix Foundation. All rights reserved. - * ====================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ========================LICENSE_END=================================== - */ - -package org.onap.ccsdk.oran.a1policymanagementservice.dmaap; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; - -import java.lang.invoke.MethodHandles; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; - -import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient; -import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory; -import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext; -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.HttpStatus; -import org.springframework.stereotype.Component; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Mono; - -/** - * The class fetches incoming requests from DMAAP. It uses the timeout parameter - * that lets the MessageRouter keep the connection with the Kafka open until - * requests are sent in. - * - * <p> - * this service will regularly check the configuration and start polling DMaaP - * if the configuration is added. If the DMaaP configuration is removed, then - * the service will stop polling and resume checking for configuration. - * - * <p> - * Each received request is processed by {@link DmaapMessageHandler}. - */ -@Component -public class DmaapMessageConsumer { - - protected static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10); - - private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private final ApplicationConfig applicationConfig; - - private DmaapMessageHandler dmaapMessageHandler = null; - - private final Gson gson; - - private final AsyncRestClientFactory restClientFactory; - - private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); - - @Value("${server.http-port}") - private int localServerHttpPort; - - private static class InfiniteFlux { - private FluxSink<Integer> sink; - private int counter = 0; - - public synchronized Flux<Integer> start() { - stop(); - return Flux.create(this::next).doOnRequest(this::onRequest); - } - - public synchronized void stop() { - if (this.sink != null) { - this.sink.complete(); - this.sink = null; - } - } - - void onRequest(long no) { - for (long i = 0; i < no; ++i) { - sink.next(counter++); - } - } - - void next(FluxSink<Integer> sink) { - this.sink = sink; - sink.next(counter++); - } - - } - - @Autowired - public DmaapMessageConsumer(ApplicationConfig applicationConfig, SecurityContext securityContext) { - this.applicationConfig = applicationConfig; - GsonBuilder gsonBuilder = new GsonBuilder(); - this.gson = gsonBuilder.create(); - this.restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext); - } - - /** - * Starts the DMAAP consumer. If there is a DMaaP configuration, it will start - * polling for messages. Otherwise it will check regularly for the - * configuration. - * - */ - public void start() { - infiniteSubmitter.stop(); - - createTask().subscribe(// - value -> logger.debug("DmaapMessageConsumer next: {}", value), // - throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), // - () -> logger.warn("DmaapMessageConsumer stopped") // - ); - } - - protected Flux<String> createTask() { - return infiniteFlux() // - .flatMap(notUsed -> fetchFromDmaap(), 1) // - .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) // - .flatMap(this::parseReceivedMessage, 1)// - .flatMap(this::handleDmaapMsg, 1) // - .onErrorResume(throwable -> Mono.empty()); - } - - protected Flux<Integer> infiniteFlux() { - return infiniteSubmitter.start(); - } - - protected Mono<Object> delay() { - return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(o -> Mono.empty()); - } - - private <T> List<T> parseList(String jsonString, Class<T> clazz) { - List<T> result = new ArrayList<>(); - JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray(); - for (JsonElement jsonElement : jsonArr) { - // The element can either be a JsonObject or a JsonString - if (jsonElement.isJsonPrimitive()) { - T json = gson.fromJson(jsonElement.getAsString(), clazz); - result.add(json); - } else { - T json = gson.fromJson(jsonElement.toString(), clazz); - result.add(json); - } - } - return result; - } - - protected boolean isDmaapConfigured() { - String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl(); - String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl(); - return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty() - && !consumerTopicUrl.isEmpty()); - } - - protected Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) { - return getDmaapMessageHandler().handleDmaapMsg(dmaapRequestMessage); - } - - protected Mono<String> getFromMessageRouter(String topicUrl) { - logger.trace("getFromMessageRouter {}", topicUrl); - AsyncRestClient c = restClientFactory.createRestClientNoHttpProxy(""); - return c.get(topicUrl); - } - - protected Flux<DmaapRequestMessage> parseReceivedMessage(String jsonString) { - try { - logger.trace("parseMessages {}", jsonString); - return Flux.fromIterable(parseList(jsonString, DmaapRequestMessage.class)); - } catch (Exception e) { - logger.error("parseMessages error {}", jsonString); - return sendErrorResponse("Could not parse: " + jsonString) // - .flatMapMany(s -> Flux.empty()); - } - } - - protected Mono<String> sendErrorResponse(String response) { - logger.debug("sendErrorResponse {}", response); - DmaapRequestMessage fakeRequest = DmaapRequestMessage.builder() // - .apiVersion("") // - .correlationId("") // - .operation(DmaapRequestMessage.Operation.PUT) // - .originatorId("") // - .payload(null) // - .requestId("") // - .target("") // - .timestamp("") // - .url("URL") // - .build(); - return getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST) // - .onErrorResume(e -> Mono.empty()); - } - - private Mono<String> fetchFromDmaap() { - if (!this.isDmaapConfigured()) { - return delay().flatMap(o -> Mono.empty()); - } - logger.debug("fetchFromDmaap"); - String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl(); - - return getFromMessageRouter(topicUrl) // - .onErrorResume(throwable -> delay().flatMap(o -> Mono.empty())); - } - - private DmaapMessageHandler getDmaapMessageHandler() { - if (this.dmaapMessageHandler == null) { - String pmsBaseUrl = "http://localhost:" + this.localServerHttpPort; - AsyncRestClient pmsClient = restClientFactory.createRestClientNoHttpProxy(pmsBaseUrl); - AsyncRestClient producer = - restClientFactory.createRestClientNoHttpProxy(this.applicationConfig.getDmaapProducerTopicUrl()); - this.dmaapMessageHandler = new DmaapMessageHandler(producer, pmsClient); - } - return this.dmaapMessageHandler; - } - -} |