aboutsummaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/main/java/org
diff options
context:
space:
mode:
authorKAPIL SINGAL <ks220y@att.com>2020-08-27 12:43:12 +0000
committerGerrit Code Review <gerrit@onap.org>2020-08-27 12:43:12 +0000
commit9bb32130eed83b9116f3e5d6f51a5a386c2fbe49 (patch)
tree1f306de80f3d71bf6f55e60c269b749bd8d3891a /a1-policy-management/src/main/java/org
parent7e634df0672f4ed7e6a89c373dfab08439cea630 (diff)
parentcbe6e68610bb477d64d776df2fbf0eccece533eb (diff)
Merge "DMAAP Improvements"
Diffstat (limited to 'a1-policy-management/src/main/java/org')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java61
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java39
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java6
3 files changed, 64 insertions, 42 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
index 37a092bb..a4da4bdd 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
@@ -21,13 +21,18 @@
package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
import com.google.common.collect.Iterables;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import com.google.gson.TypeAdapterFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
@@ -36,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
@@ -63,12 +69,17 @@ public class DmaapMessageConsumer {
private DmaapMessageHandler dmaapMessageHandler = null;
+ private final Gson gson;
+
@Value("${server.http-port}")
private int localServerHttpPort;
@Autowired
public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
this.applicationConfig = applicationConfig;
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+ gson = gsonBuilder.create();
}
/**
@@ -87,10 +98,10 @@ public class DmaapMessageConsumer {
while (!isStopped()) {
try {
if (isDmaapConfigured()) {
- Iterable<String> dmaapMsgs = fetchAllMessages();
+ Iterable<DmaapRequestMessage> dmaapMsgs = fetchAllMessages();
if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
- for (String msg : dmaapMsgs) {
+ for (DmaapRequestMessage msg : dmaapMsgs) {
processMsg(msg);
}
}
@@ -115,21 +126,47 @@ public class DmaapMessageConsumer {
&& !consumerTopicUrl.isEmpty());
}
- private static List<String> parseMessages(String jsonString) {
- JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray();
- List<String> result = new ArrayList<>();
- for (JsonElement element : arrayOfMessages) {
- if (element.isJsonPrimitive()) {
- result.add(element.getAsString());
+ private <T> List<T> parseList(String jsonString, Class<T> clazz) {
+ List<T> result = new ArrayList<>();
+ JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray();
+ for (JsonElement jsonElement : jsonArr) {
+ // The element can either be a JsonObject or a JsonString
+ if (jsonElement.isJsonPrimitive()) {
+ T json = gson.fromJson(jsonElement.getAsString(), clazz);
+ result.add(json);
} else {
- String messageAsString = element.toString();
- result.add(messageAsString);
+ T json = gson.fromJson(jsonElement.toString(), clazz);
+ result.add(json);
}
}
return result;
}
- protected Iterable<String> fetchAllMessages() throws ServiceException {
+ private void sendErrorResponse(String response) {
+ DmaapRequestMessage fakeRequest = ImmutableDmaapRequestMessage.builder() //
+ .apiVersion("") //
+ .correlationId("") //
+ .operation(DmaapRequestMessage.Operation.PUT) //
+ .originatorId("") //
+ .payload(Optional.empty()) //
+ .requestId("") //
+ .target("") //
+ .timestamp("") //
+ .url("URL") //
+ .build();
+ getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST).block();
+ }
+
+ List<DmaapRequestMessage> parseMessages(String jsonString) throws ServiceException {
+ try {
+ return parseList(jsonString, DmaapRequestMessage.class);
+ } catch (Exception e) {
+ sendErrorResponse("Not parsable request received, reason:" + e.toString() + ", input :" + jsonString);
+ throw new ServiceException("Could not parse incomming request. Reason :" + e.getMessage());
+ }
+ }
+
+ protected Iterable<DmaapRequestMessage> fetchAllMessages() throws ServiceException {
String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
AsyncRestClient consumer = getMessageRouterConsumer();
ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
@@ -142,7 +179,7 @@ public class DmaapMessageConsumer {
}
}
- private void processMsg(String msg) {
+ private void processMsg(DmaapRequestMessage msg) {
logger.debug("Message Reveived from DMAAP : {}", msg);
getDmaapMessageHandler().handleDmaapMsg(msg);
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
index 040d8b3e..2d7b5063 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
@@ -40,12 +40,12 @@ import reactor.core.publisher.Mono;
/**
* The class handles incoming requests from DMAAP.
* <p>
- * That means: invoke a REST call towards this services and to send back a response though DMAAP
+ * That means: invoke a REST call towards this services and to send back a
+ * response though DMAAP
*/
public class DmaapMessageHandler {
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
- private static Gson gson = new GsonBuilder() //
- .create(); //
+ private static Gson gson = new GsonBuilder().create();
private final AsyncRestClient dmaapClient;
private final AsyncRestClient pmsClient;
@@ -54,7 +54,7 @@ public class DmaapMessageHandler {
this.dmaapClient = dmaapClient;
}
- public void handleDmaapMsg(String msg) {
+ public void handleDmaapMsg(DmaapRequestMessage msg) {
try {
String result = this.createTask(msg).block();
logger.debug("handleDmaapMsg: {}", result);
@@ -63,17 +63,10 @@ public class DmaapMessageHandler {
}
}
- Mono<String> createTask(String msg) {
- try {
- DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
- return this.invokePolicyManagementService(dmaapRequestMessage) //
- .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) //
- .flatMap(
- response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
- } catch (Exception e) {
- String errorMsg = "Received unparsable message from DMAAP: \"" + msg + "\", reason: " + e.getMessage();
- return Mono.error(new ServiceException(errorMsg)); // Cannot make any response
- }
+ Mono<String> createTask(DmaapRequestMessage dmaapRequestMessage) {
+ return this.invokePolicyManagementService(dmaapRequestMessage) //
+ .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) //
+ .flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
}
private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error,
@@ -95,6 +88,12 @@ public class DmaapMessageHandler {
.flatMap(notUsed -> Mono.empty());
}
+ public Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage, HttpStatus status) {
+ return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
+ .flatMap(this::sendToDmaap) //
+ .onErrorResume(this::handleResponseCallError);
+ }
+
private Mono<ResponseEntity<String>> invokePolicyManagementService(DmaapRequestMessage dmaapRequestMessage) {
DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
String uri = dmaapRequestMessage.url();
@@ -122,20 +121,13 @@ public class DmaapMessageHandler {
}
}
- private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
- HttpStatus status) {
- return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
- .flatMap(this::sendToDmaap) //
- .onErrorResume(this::handleResponseCallError);
- }
-
private Mono<String> sendToDmaap(String body) {
logger.debug("sendToDmaap: {} ", body);
return dmaapClient.post("", "[" + body + "]");
}
private Mono<String> handleResponseCallError(Throwable t) {
- logger.debug("Failed to send response to DMaaP: {}", t.getMessage());
+ logger.warn("Failed to send response to DMaaP: {}", t.getMessage());
return Mono.empty();
}
@@ -152,6 +144,5 @@ public class DmaapMessageHandler {
.build();
String str = gson.toJson(dmaapResponseMessage);
return Mono.just(str);
-
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index 57d6969f..248ba32d 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -20,11 +20,9 @@
package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
-import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.TypeAdapterFactory;
import java.io.BufferedInputStream;
import java.io.File;
@@ -34,7 +32,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Duration;
import java.util.Properties;
-import java.util.ServiceLoader;
import javax.validation.constraints.NotNull;
@@ -239,9 +236,6 @@ public class RefreshConfigTask {
return Flux.empty();
}
- GsonBuilder gsonBuilder = new GsonBuilder();
- ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
-
try (InputStream inputStream = createInputStream(filepath)) {
JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
ApplicationConfigParser appParser = new ApplicationConfigParser();