diff options
Diffstat (limited to 'a1-policy-management/src/main/java')
3 files changed, 64 insertions, 42 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java index 37a092bb..a4da4bdd 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java @@ -21,13 +21,18 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap; import com.google.common.collect.Iterables; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import com.google.gson.TypeAdapterFactory; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.ServiceLoader; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; @@ -36,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; @@ -63,12 +69,17 @@ public class DmaapMessageConsumer { private DmaapMessageHandler dmaapMessageHandler = null; + private final Gson gson; + @Value("${server.http-port}") private int localServerHttpPort; @Autowired public DmaapMessageConsumer(ApplicationConfig applicationConfig) { this.applicationConfig = applicationConfig; + GsonBuilder gsonBuilder = new GsonBuilder(); + ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); + gson = gsonBuilder.create(); } /** @@ -87,10 +98,10 @@ public class DmaapMessageConsumer { while (!isStopped()) { try { if (isDmaapConfigured()) { - Iterable<String> dmaapMsgs = fetchAllMessages(); + Iterable<DmaapRequestMessage> dmaapMsgs = fetchAllMessages(); if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) { logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); - for (String msg : dmaapMsgs) { + for (DmaapRequestMessage msg : dmaapMsgs) { processMsg(msg); } } @@ -115,21 +126,47 @@ public class DmaapMessageConsumer { && !consumerTopicUrl.isEmpty()); } - private static List<String> parseMessages(String jsonString) { - JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray(); - List<String> result = new ArrayList<>(); - for (JsonElement element : arrayOfMessages) { - if (element.isJsonPrimitive()) { - result.add(element.getAsString()); + private <T> List<T> parseList(String jsonString, Class<T> clazz) { + List<T> result = new ArrayList<>(); + JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray(); + for (JsonElement jsonElement : jsonArr) { + // The element can either be a JsonObject or a JsonString + if (jsonElement.isJsonPrimitive()) { + T json = gson.fromJson(jsonElement.getAsString(), clazz); + result.add(json); } else { - String messageAsString = element.toString(); - result.add(messageAsString); + T json = gson.fromJson(jsonElement.toString(), clazz); + result.add(json); } } return result; } - protected Iterable<String> fetchAllMessages() throws ServiceException { + private void sendErrorResponse(String response) { + DmaapRequestMessage fakeRequest = ImmutableDmaapRequestMessage.builder() // + .apiVersion("") // + .correlationId("") // + .operation(DmaapRequestMessage.Operation.PUT) // + .originatorId("") // + .payload(Optional.empty()) // + .requestId("") // + .target("") // + .timestamp("") // + .url("URL") // + .build(); + getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST).block(); + } + + List<DmaapRequestMessage> parseMessages(String jsonString) throws ServiceException { + try { + return parseList(jsonString, DmaapRequestMessage.class); + } catch (Exception e) { + sendErrorResponse("Not parsable request received, reason:" + e.toString() + ", input :" + jsonString); + throw new ServiceException("Could not parse incomming request. Reason :" + e.getMessage()); + } + } + + protected Iterable<DmaapRequestMessage> fetchAllMessages() throws ServiceException { String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl(); AsyncRestClient consumer = getMessageRouterConsumer(); ResponseEntity<String> response = consumer.getForEntity(topicUrl).block(); @@ -142,7 +179,7 @@ public class DmaapMessageConsumer { } } - private void processMsg(String msg) { + private void processMsg(DmaapRequestMessage msg) { logger.debug("Message Reveived from DMAAP : {}", msg); getDmaapMessageHandler().handleDmaapMsg(msg); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java index 040d8b3e..2d7b5063 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java @@ -40,12 +40,12 @@ import reactor.core.publisher.Mono; /** * The class handles incoming requests from DMAAP. * <p> - * That means: invoke a REST call towards this services and to send back a response though DMAAP + * That means: invoke a REST call towards this services and to send back a + * response though DMAAP */ public class DmaapMessageHandler { private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class); - private static Gson gson = new GsonBuilder() // - .create(); // + private static Gson gson = new GsonBuilder().create(); private final AsyncRestClient dmaapClient; private final AsyncRestClient pmsClient; @@ -54,7 +54,7 @@ public class DmaapMessageHandler { this.dmaapClient = dmaapClient; } - public void handleDmaapMsg(String msg) { + public void handleDmaapMsg(DmaapRequestMessage msg) { try { String result = this.createTask(msg).block(); logger.debug("handleDmaapMsg: {}", result); @@ -63,17 +63,10 @@ public class DmaapMessageHandler { } } - Mono<String> createTask(String msg) { - try { - DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class); - return this.invokePolicyManagementService(dmaapRequestMessage) // - .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) // - .flatMap( - response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode())); - } catch (Exception e) { - String errorMsg = "Received unparsable message from DMAAP: \"" + msg + "\", reason: " + e.getMessage(); - return Mono.error(new ServiceException(errorMsg)); // Cannot make any response - } + Mono<String> createTask(DmaapRequestMessage dmaapRequestMessage) { + return this.invokePolicyManagementService(dmaapRequestMessage) // + .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) // + .flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode())); } private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error, @@ -95,6 +88,12 @@ public class DmaapMessageHandler { .flatMap(notUsed -> Mono.empty()); } + public Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage, HttpStatus status) { + return createDmaapResponseMessage(dmaapRequestMessage, response, status) // + .flatMap(this::sendToDmaap) // + .onErrorResume(this::handleResponseCallError); + } + private Mono<ResponseEntity<String>> invokePolicyManagementService(DmaapRequestMessage dmaapRequestMessage) { DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation(); String uri = dmaapRequestMessage.url(); @@ -122,20 +121,13 @@ public class DmaapMessageHandler { } } - private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage, - HttpStatus status) { - return createDmaapResponseMessage(dmaapRequestMessage, response, status) // - .flatMap(this::sendToDmaap) // - .onErrorResume(this::handleResponseCallError); - } - private Mono<String> sendToDmaap(String body) { logger.debug("sendToDmaap: {} ", body); return dmaapClient.post("", "[" + body + "]"); } private Mono<String> handleResponseCallError(Throwable t) { - logger.debug("Failed to send response to DMaaP: {}", t.getMessage()); + logger.warn("Failed to send response to DMaaP: {}", t.getMessage()); return Mono.empty(); } @@ -152,6 +144,5 @@ public class DmaapMessageHandler { .build(); String str = gson.toJson(dmaapResponseMessage); return Mono.just(str); - } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 57d6969f..248ba32d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -20,11 +20,9 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks; -import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.TypeAdapterFactory; import java.io.BufferedInputStream; import java.io.File; @@ -34,7 +32,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.time.Duration; import java.util.Properties; -import java.util.ServiceLoader; import javax.validation.constraints.NotNull; @@ -239,9 +236,6 @@ public class RefreshConfigTask { return Flux.empty(); } - GsonBuilder gsonBuilder = new GsonBuilder(); - ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - try (InputStream inputStream = createInputStream(filepath)) { JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject(); ApplicationConfigParser appParser = new ApplicationConfigParser(); |