summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java53
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java89
3 files changed, 76 insertions, 68 deletions
diff --git a/pom.xml b/pom.xml
index ba105d7..fd1f83a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,13 +94,11 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
- <version>4.3.6.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
- <version>1.5.1.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java b/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java
index 3881642..c4f1eef 100644
--- a/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java
+++ b/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java
@@ -30,6 +30,7 @@ import org.onap.aai.restclient.client.RestClient;
import org.onap.pomba.contextaggregator.builder.ContextBuilder;
import org.onap.pomba.contextaggregator.datatypes.POAEvent;
import org.onap.pomba.contextaggregator.exception.ContextAggregatorError;
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.util.UriComponents;
@@ -53,40 +54,38 @@ public class RestRequest {
}
/**
- * Retrieves the model data from the given context builder
+ * Retrieves the model data from the given context builder.
*
- * @param builder
- * @param event
+ * @param builder The context builder.
+ * @param event The audit event.
* @return Returns the JSON response from the context builder
*/
- public static String getModelData(ContextBuilder builder, POAEvent event) {
+ public static String getModelData(ContextBuilder builder, POAEvent event) throws ContextAggregatorException {
RestClient restClient = createRestClient(builder);
- OperationResult result = null;
-
+ OperationResult result;
+
try {
- result = restClient.get(generateUri(builder, event),
- generateHeaders(event.getxTransactionId(), builder), MediaType.APPLICATION_JSON_TYPE);
- } catch(Exception e) {
- log.error("Error getting result from " + builder.getContextName() + " context builder. Reason - " + e.getMessage());
- return null;
+ result = restClient.get(generateUri(builder, event), generateHeaders(event.getxTransactionId(), builder),
+ MediaType.APPLICATION_JSON_TYPE);
+ } catch (Exception e) {
+ log.error("Exception in Rest call", e);
+ throw new ContextAggregatorException(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA,
+ builder.getContextName(), e.getMessage());
}
- if(result != null) {
- if(result.wasSuccessful()) {
- log.debug("Retrieved model data for '" + builder.getContextName() + "': " + result.getResult());
- return result.getResult();
- } else {
- // failed! return null
- log.error(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA.getMessage(builder.getContextName(),
- result.getFailureCause()));
- log.debug("Failed to retrieve model data for '" + builder.getContextName());
- return null;
- }
- } else {
- log.debug("Failed to retrieve model data for '" + builder.getContextName());
- return null;
+ if (result == null) {
+ throw new ContextAggregatorException(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA,
+ builder.getContextName(), "Null result");
+ }
+ if (result.wasSuccessful()) {
+ log.info("Retrieved model data for '{}' context builder. Result: {}", builder.getContextName(), result.getResult());
+ return result.getResult();
}
+ // failed! throw Exception:
+ throw new ContextAggregatorException(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA, builder.getContextName(),
+ result.getFailureCause());
+
}
private static RestClient createRestClient(ContextBuilder builder) {
@@ -112,8 +111,8 @@ public class RestRequest {
}
private static String getBasicAuthString(ContextBuilder builder) {
- String encodedString = Base64.getEncoder().encodeToString(( builder.getUsername() + ":" +
- Password.deobfuscate(builder.getPassword())).getBytes());
+ String encodedString = Base64.getEncoder()
+ .encodeToString((builder.getUsername() + ":" + Password.deobfuscate(builder.getPassword())).getBytes());
return BASIC_AUTH + encodedString;
}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
index 8b8f8f6..21f3a3e 100644
--- a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
+++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
@@ -15,8 +15,19 @@
* limitations under the License.
* ============LICENSE_END=====================================================
*/
+
package org.onap.pomba.contextaggregator.service;
+import com.att.aft.dme2.internal.gson.Gson;
+import com.att.aft.dme2.internal.gson.GsonBuilder;
+import com.att.aft.dme2.internal.gson.JsonSyntaxException;
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.MRPublisher;
+import com.att.nsa.mr.client.MRTopicManager;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -30,6 +41,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.onap.pomba.common.datatypes.DataQuality;
+import org.onap.pomba.common.datatypes.DataQuality.Status;
+import org.onap.pomba.common.datatypes.ModelContext;
import org.onap.pomba.contextaggregator.builder.ContextBuilder;
import org.onap.pomba.contextaggregator.config.EventHeaderConfig;
import org.onap.pomba.contextaggregator.datatypes.AggregatedModels;
@@ -44,16 +58,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
-import com.att.aft.dme2.internal.gson.Gson;
-import com.att.aft.dme2.internal.gson.GsonBuilder;
-import com.att.aft.dme2.internal.gson.JsonSyntaxException;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.MRPublisher;
-import com.att.nsa.mr.client.MRTopicManager;
-import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
-
@Component
public class ContextAggregatorProcessor implements Callable<Void> {
@@ -85,15 +89,15 @@ public class ContextAggregatorProcessor implements Callable<Void> {
* Parses the consumed event, retrieves model data from all context builders and publishes the
* aggregated models to DMaaP.
*
- * @param payload
+ * @param payload The event pay load
*/
public void process(String payload) {
- log.debug("Consumed event: " + payload);
+ log.info("Consumed event: {}", payload);
POAEvent event;
try {
event = parseEvent(payload);
- log.debug("Received POA event: " + event.toString());
+ log.info("Received POA event: {}", event);
} catch (ContextAggregatorException e) {
log.error(ContextAggregatorError.INVALID_EVENT_RECEIVED.getMessage(e.getMessage()));
// TODO: publish to error topic?
@@ -102,14 +106,20 @@ public class ContextAggregatorProcessor implements Callable<Void> {
Map<String, String> retrievedModels = new HashMap<>();
for (ContextBuilder builder : contextBuilders) {
- log.debug("Retrieving model data for: " + builder.getContextName());
- String modelData = RestRequest.getModelData(builder, event);
- if (modelData == null) {
- // If one of the Context builder return error, Aggregator will not publish the event
- log.info("Error returned from one of the Context builders, no event will be published.");
- return;
- } else {
+ try {
+ log.info("Retrieving model data for: {}", builder.getContextName());
+ String modelData = RestRequest.getModelData(builder, event);
retrievedModels.put(builder.getContextName(), modelData);
+ } catch (ContextAggregatorException e) {
+ DataQuality errorDataQuality = new DataQuality();
+ errorDataQuality.setStatus(Status.error);
+ errorDataQuality.setErrorText(e.getMessage());
+ ModelContext modelContext = new ModelContext();
+ modelContext.setDataQuality(errorDataQuality);
+ Gson gsonBuilder = new GsonBuilder().create();
+ String errorData = gsonBuilder.toJson(modelContext);
+ log.error("Setting dataQuality status for '{}' context builder to ERROR: {}.", builder.getContextName(), e.getMessage());
+ retrievedModels.put(builder.getContextName(), errorData);
}
}
@@ -164,14 +174,14 @@ public class ContextAggregatorProcessor implements Callable<Void> {
}
/**
- * Publishes the aggregated models
+ * Publishes the aggregated models.
*
* @param models
* @throws ContextAggregatorException
*/
private void publishModels(AggregatedModels models) throws ContextAggregatorException {
String payload = models.generateJsonPayload();
- log.debug("Publishing models: " + payload);
+ log.info("Publishing models: {}", payload);
retriesRemaining = publisherFactory.getRetries();
publish(Arrays.asList(payload));
}
@@ -188,7 +198,7 @@ public class ContextAggregatorProcessor implements Callable<Void> {
String partition = publisherFactory.getPartition();
try {
((MRSimplerBatchPublisher) publisher).getProps().put("partition", partition);
- final Collection<MRPublisher.message> dmaapMessages = new ArrayList<MRPublisher.message>();
+ final Collection<MRPublisher.message> dmaapMessages = new ArrayList<>();
for (final String message : messages) {
dmaapMessages.add(new MRPublisher.message(partition, message));
}
@@ -223,7 +233,7 @@ public class ContextAggregatorProcessor implements Callable<Void> {
}
/**
- * Retries to publish messages or throws the given exception if no retries are left
+ * Retries to publish messages or throws the given exception if no retries are left.
*
* @param messages
* @param exceptionToThrow
@@ -234,8 +244,8 @@ public class ContextAggregatorProcessor implements Callable<Void> {
if (retriesRemaining <= 0) {
throw exceptionToThrow;
}
- log.debug(String.format("Retrying to publish messages (%d %s remaining)...", retriesRemaining,
- ((retriesRemaining == 1) ? "retry" : "retries")));
+ log.info("Retrying to publish messages ({} {} remaining)...", retriesRemaining,
+ ((retriesRemaining == 1) ? "retry" : "retries"));
retriesRemaining--;
publish(messages);
}
@@ -251,21 +261,21 @@ public class ContextAggregatorProcessor implements Callable<Void> {
try {
return publisher.close(20L, TimeUnit.SECONDS).stream().map(m -> m.fMsg).collect(Collectors.toList());
} catch (Exception e) {
- throw new ContextAggregatorException(ContextAggregatorError.PUBLISHER_CLOSE_ERROR, e.getMessage());
+ throw new ContextAggregatorException(ContextAggregatorError.PUBLISHER_CLOSE_ERROR, e);
}
}
private List<String> getRequiredTopicList(String messageRouterRequiredPombaTopicList) {
- List<String> pombaTopicList = new ArrayList<String>();
+ List<String> pombaTopicList = new ArrayList<>();
String noSpacePombaTopicList = messageRouterRequiredPombaTopicList.replaceAll("\\s", "");
String[] pombaTopicStrSet = noSpacePombaTopicList.split(",");
for (int i = 0; i < pombaTopicStrSet.length; i++) {
- pombaTopicList.add(new String(pombaTopicStrSet[i]));
+ pombaTopicList.add(pombaTopicStrSet[i]);
}
return pombaTopicList;
}
- private void createPombaTopics () {
+ private void createPombaTopics() {
List<String> requiredTopicList = getRequiredTopicList(messageRouterRequiredPombaTopicList);
@@ -273,15 +283,16 @@ public class ContextAggregatorProcessor implements Callable<Void> {
int partitionCount = 1;
int replicationCount = 1;
- for ( String topic_required : requiredTopicList) {
- try {
- messageRouterTopicMgr.createTopic(topic_required, topicDescription, partitionCount, replicationCount);
- } catch (HttpException e1) {
- log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage()));
- } catch (IOException e) {
- log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage()));
- }
- }
+ for (String topicRequired : requiredTopicList) {
+ try {
+ messageRouterTopicMgr.createTopic(topicRequired, topicDescription, partitionCount, replicationCount);
+ log.info("Created Pomba Topic {}", topicRequired);
+ } catch (HttpException e1) {
+ log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage()));
+ } catch (IOException e) {
+ log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage()));
+ }
+ }
}
}