summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java')
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java89
1 files changed, 50 insertions, 39 deletions
diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
index 8b8f8f6..21f3a3e 100644
--- a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
+++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
@@ -15,8 +15,19 @@
* limitations under the License.
* ============LICENSE_END=====================================================
*/
+
package org.onap.pomba.contextaggregator.service;
+import com.att.aft.dme2.internal.gson.Gson;
+import com.att.aft.dme2.internal.gson.GsonBuilder;
+import com.att.aft.dme2.internal.gson.JsonSyntaxException;
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.MRPublisher;
+import com.att.nsa.mr.client.MRTopicManager;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -30,6 +41,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.onap.pomba.common.datatypes.DataQuality;
+import org.onap.pomba.common.datatypes.DataQuality.Status;
+import org.onap.pomba.common.datatypes.ModelContext;
import org.onap.pomba.contextaggregator.builder.ContextBuilder;
import org.onap.pomba.contextaggregator.config.EventHeaderConfig;
import org.onap.pomba.contextaggregator.datatypes.AggregatedModels;
@@ -44,16 +58,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
-import com.att.aft.dme2.internal.gson.Gson;
-import com.att.aft.dme2.internal.gson.GsonBuilder;
-import com.att.aft.dme2.internal.gson.JsonSyntaxException;
-import com.att.nsa.apiClient.http.HttpException;
-import com.att.nsa.mr.client.MRBatchingPublisher;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.MRPublisher;
-import com.att.nsa.mr.client.MRTopicManager;
-import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
-
@Component
public class ContextAggregatorProcessor implements Callable<Void> {
@@ -85,15 +89,15 @@ public class ContextAggregatorProcessor implements Callable<Void> {
* Parses the consumed event, retrieves model data from all context builders and publishes the
* aggregated models to DMaaP.
*
- * @param payload
+ * @param payload The event pay load
*/
public void process(String payload) {
- log.debug("Consumed event: " + payload);
+ log.info("Consumed event: {}", payload);
POAEvent event;
try {
event = parseEvent(payload);
- log.debug("Received POA event: " + event.toString());
+ log.info("Received POA event: {}", event);
} catch (ContextAggregatorException e) {
log.error(ContextAggregatorError.INVALID_EVENT_RECEIVED.getMessage(e.getMessage()));
// TODO: publish to error topic?
@@ -102,14 +106,20 @@ public class ContextAggregatorProcessor implements Callable<Void> {
Map<String, String> retrievedModels = new HashMap<>();
for (ContextBuilder builder : contextBuilders) {
- log.debug("Retrieving model data for: " + builder.getContextName());
- String modelData = RestRequest.getModelData(builder, event);
- if (modelData == null) {
- // If one of the Context builder return error, Aggregator will not publish the event
- log.info("Error returned from one of the Context builders, no event will be published.");
- return;
- } else {
+ try {
+ log.info("Retrieving model data for: {}", builder.getContextName());
+ String modelData = RestRequest.getModelData(builder, event);
retrievedModels.put(builder.getContextName(), modelData);
+ } catch (ContextAggregatorException e) {
+ DataQuality errorDataQuality = new DataQuality();
+ errorDataQuality.setStatus(Status.error);
+ errorDataQuality.setErrorText(e.getMessage());
+ ModelContext modelContext = new ModelContext();
+ modelContext.setDataQuality(errorDataQuality);
+ Gson gsonBuilder = new GsonBuilder().create();
+ String errorData = gsonBuilder.toJson(modelContext);
+ log.error("Setting dataQuality status for '{}' context builder to ERROR: {}.", builder.getContextName(), e.getMessage());
+ retrievedModels.put(builder.getContextName(), errorData);
}
}
@@ -164,14 +174,14 @@ public class ContextAggregatorProcessor implements Callable<Void> {
}
/**
- * Publishes the aggregated models
+ * Publishes the aggregated models.
*
* @param models
* @throws ContextAggregatorException
*/
private void publishModels(AggregatedModels models) throws ContextAggregatorException {
String payload = models.generateJsonPayload();
- log.debug("Publishing models: " + payload);
+ log.info("Publishing models: {}", payload);
retriesRemaining = publisherFactory.getRetries();
publish(Arrays.asList(payload));
}
@@ -188,7 +198,7 @@ public class ContextAggregatorProcessor implements Callable<Void> {
String partition = publisherFactory.getPartition();
try {
((MRSimplerBatchPublisher) publisher).getProps().put("partition", partition);
- final Collection<MRPublisher.message> dmaapMessages = new ArrayList<MRPublisher.message>();
+ final Collection<MRPublisher.message> dmaapMessages = new ArrayList<>();
for (final String message : messages) {
dmaapMessages.add(new MRPublisher.message(partition, message));
}
@@ -223,7 +233,7 @@ public class ContextAggregatorProcessor implements Callable<Void> {
}
/**
- * Retries to publish messages or throws the given exception if no retries are left
+ * Retries to publish messages or throws the given exception if no retries are left.
*
* @param messages
* @param exceptionToThrow
@@ -234,8 +244,8 @@ public class ContextAggregatorProcessor implements Callable<Void> {
if (retriesRemaining <= 0) {
throw exceptionToThrow;
}
- log.debug(String.format("Retrying to publish messages (%d %s remaining)...", retriesRemaining,
- ((retriesRemaining == 1) ? "retry" : "retries")));
+ log.info("Retrying to publish messages ({} {} remaining)...", retriesRemaining,
+ ((retriesRemaining == 1) ? "retry" : "retries"));
retriesRemaining--;
publish(messages);
}
@@ -251,21 +261,21 @@ public class ContextAggregatorProcessor implements Callable<Void> {
try {
return publisher.close(20L, TimeUnit.SECONDS).stream().map(m -> m.fMsg).collect(Collectors.toList());
} catch (Exception e) {
- throw new ContextAggregatorException(ContextAggregatorError.PUBLISHER_CLOSE_ERROR, e.getMessage());
+ throw new ContextAggregatorException(ContextAggregatorError.PUBLISHER_CLOSE_ERROR, e);
}
}
private List<String> getRequiredTopicList(String messageRouterRequiredPombaTopicList) {
- List<String> pombaTopicList = new ArrayList<String>();
+ List<String> pombaTopicList = new ArrayList<>();
String noSpacePombaTopicList = messageRouterRequiredPombaTopicList.replaceAll("\\s", "");
String[] pombaTopicStrSet = noSpacePombaTopicList.split(",");
for (int i = 0; i < pombaTopicStrSet.length; i++) {
- pombaTopicList.add(new String(pombaTopicStrSet[i]));
+ pombaTopicList.add(pombaTopicStrSet[i]);
}
return pombaTopicList;
}
- private void createPombaTopics () {
+ private void createPombaTopics() {
List<String> requiredTopicList = getRequiredTopicList(messageRouterRequiredPombaTopicList);
@@ -273,15 +283,16 @@ public class ContextAggregatorProcessor implements Callable<Void> {
int partitionCount = 1;
int replicationCount = 1;
- for ( String topic_required : requiredTopicList) {
- try {
- messageRouterTopicMgr.createTopic(topic_required, topicDescription, partitionCount, replicationCount);
- } catch (HttpException e1) {
- log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage()));
- } catch (IOException e) {
- log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage()));
- }
- }
+ for (String topicRequired : requiredTopicList) {
+ try {
+ messageRouterTopicMgr.createTopic(topicRequired, topicDescription, partitionCount, replicationCount);
+ log.info("Created Pomba Topic {}", topicRequired);
+ } catch (HttpException e1) {
+ log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage()));
+ } catch (IOException e) {
+ log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage()));
+ }
+ }
}
}