diff options
-rw-r--r-- | pom.xml | 2 | ||||
-rw-r--r-- | src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java | 53 | ||||
-rw-r--r-- | src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java | 89 |
3 files changed, 76 insertions, 68 deletions
@@ -94,13 +94,11 @@ <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> - <version>4.3.6.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-test</artifactId> - <version>1.5.1.RELEASE</version> <scope>test</scope> </dependency> <dependency> diff --git a/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java b/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java index 3881642..c4f1eef 100644 --- a/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java +++ b/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java @@ -30,6 +30,7 @@ import org.onap.aai.restclient.client.RestClient; import org.onap.pomba.contextaggregator.builder.ContextBuilder; import org.onap.pomba.contextaggregator.datatypes.POAEvent; import org.onap.pomba.contextaggregator.exception.ContextAggregatorError; +import org.onap.pomba.contextaggregator.exception.ContextAggregatorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.util.UriComponents; @@ -53,40 +54,38 @@ public class RestRequest { } /** - * Retrieves the model data from the given context builder + * Retrieves the model data from the given context builder. * - * @param builder - * @param event + * @param builder The context builder. + * @param event The audit event. * @return Returns the JSON response from the context builder */ - public static String getModelData(ContextBuilder builder, POAEvent event) { + public static String getModelData(ContextBuilder builder, POAEvent event) throws ContextAggregatorException { RestClient restClient = createRestClient(builder); - OperationResult result = null; - + OperationResult result; + try { - result = restClient.get(generateUri(builder, event), - generateHeaders(event.getxTransactionId(), builder), MediaType.APPLICATION_JSON_TYPE); - } catch(Exception e) { - log.error("Error getting result from " + builder.getContextName() + " context builder. Reason - " + e.getMessage()); - return null; + result = restClient.get(generateUri(builder, event), generateHeaders(event.getxTransactionId(), builder), + MediaType.APPLICATION_JSON_TYPE); + } catch (Exception e) { + log.error("Exception in Rest call", e); + throw new ContextAggregatorException(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA, + builder.getContextName(), e.getMessage()); } - if(result != null) { - if(result.wasSuccessful()) { - log.debug("Retrieved model data for '" + builder.getContextName() + "': " + result.getResult()); - return result.getResult(); - } else { - // failed! return null - log.error(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA.getMessage(builder.getContextName(), - result.getFailureCause())); - log.debug("Failed to retrieve model data for '" + builder.getContextName()); - return null; - } - } else { - log.debug("Failed to retrieve model data for '" + builder.getContextName()); - return null; + if (result == null) { + throw new ContextAggregatorException(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA, + builder.getContextName(), "Null result"); + } + if (result.wasSuccessful()) { + log.info("Retrieved model data for '{}' context builder. Result: {}", builder.getContextName(), result.getResult()); + return result.getResult(); } + // failed! throw Exception: + throw new ContextAggregatorException(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA, builder.getContextName(), + result.getFailureCause()); + } private static RestClient createRestClient(ContextBuilder builder) { @@ -112,8 +111,8 @@ public class RestRequest { } private static String getBasicAuthString(ContextBuilder builder) { - String encodedString = Base64.getEncoder().encodeToString(( builder.getUsername() + ":" + - Password.deobfuscate(builder.getPassword())).getBytes()); + String encodedString = Base64.getEncoder() + .encodeToString((builder.getUsername() + ":" + Password.deobfuscate(builder.getPassword())).getBytes()); return BASIC_AUTH + encodedString; } diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java index 8b8f8f6..21f3a3e 100644 --- a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java +++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java @@ -15,8 +15,19 @@ * limitations under the License. * ============LICENSE_END===================================================== */ + package org.onap.pomba.contextaggregator.service; +import com.att.aft.dme2.internal.gson.Gson; +import com.att.aft.dme2.internal.gson.GsonBuilder; +import com.att.aft.dme2.internal.gson.JsonSyntaxException; +import com.att.nsa.apiClient.http.HttpException; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.MRPublisher; +import com.att.nsa.mr.client.MRTopicManager; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -30,6 +41,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.onap.pomba.common.datatypes.DataQuality; +import org.onap.pomba.common.datatypes.DataQuality.Status; +import org.onap.pomba.common.datatypes.ModelContext; import org.onap.pomba.contextaggregator.builder.ContextBuilder; import org.onap.pomba.contextaggregator.config.EventHeaderConfig; import org.onap.pomba.contextaggregator.datatypes.AggregatedModels; @@ -44,16 +58,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; -import com.att.aft.dme2.internal.gson.Gson; -import com.att.aft.dme2.internal.gson.GsonBuilder; -import com.att.aft.dme2.internal.gson.JsonSyntaxException; -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.mr.client.MRBatchingPublisher; -import com.att.nsa.mr.client.MRConsumer; -import com.att.nsa.mr.client.MRPublisher; -import com.att.nsa.mr.client.MRTopicManager; -import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; - @Component public class ContextAggregatorProcessor implements Callable<Void> { @@ -85,15 +89,15 @@ public class ContextAggregatorProcessor implements Callable<Void> { * Parses the consumed event, retrieves model data from all context builders and publishes the * aggregated models to DMaaP. * - * @param payload + * @param payload The event pay load */ public void process(String payload) { - log.debug("Consumed event: " + payload); + log.info("Consumed event: {}", payload); POAEvent event; try { event = parseEvent(payload); - log.debug("Received POA event: " + event.toString()); + log.info("Received POA event: {}", event); } catch (ContextAggregatorException e) { log.error(ContextAggregatorError.INVALID_EVENT_RECEIVED.getMessage(e.getMessage())); // TODO: publish to error topic? @@ -102,14 +106,20 @@ public class ContextAggregatorProcessor implements Callable<Void> { Map<String, String> retrievedModels = new HashMap<>(); for (ContextBuilder builder : contextBuilders) { - log.debug("Retrieving model data for: " + builder.getContextName()); - String modelData = RestRequest.getModelData(builder, event); - if (modelData == null) { - // If one of the Context builder return error, Aggregator will not publish the event - log.info("Error returned from one of the Context builders, no event will be published."); - return; - } else { + try { + log.info("Retrieving model data for: {}", builder.getContextName()); + String modelData = RestRequest.getModelData(builder, event); retrievedModels.put(builder.getContextName(), modelData); + } catch (ContextAggregatorException e) { + DataQuality errorDataQuality = new DataQuality(); + errorDataQuality.setStatus(Status.error); + errorDataQuality.setErrorText(e.getMessage()); + ModelContext modelContext = new ModelContext(); + modelContext.setDataQuality(errorDataQuality); + Gson gsonBuilder = new GsonBuilder().create(); + String errorData = gsonBuilder.toJson(modelContext); + log.error("Setting dataQuality status for '{}' context builder to ERROR: {}.", builder.getContextName(), e.getMessage()); + retrievedModels.put(builder.getContextName(), errorData); } } @@ -164,14 +174,14 @@ public class ContextAggregatorProcessor implements Callable<Void> { } /** - * Publishes the aggregated models + * Publishes the aggregated models. * * @param models * @throws ContextAggregatorException */ private void publishModels(AggregatedModels models) throws ContextAggregatorException { String payload = models.generateJsonPayload(); - log.debug("Publishing models: " + payload); + log.info("Publishing models: {}", payload); retriesRemaining = publisherFactory.getRetries(); publish(Arrays.asList(payload)); } @@ -188,7 +198,7 @@ public class ContextAggregatorProcessor implements Callable<Void> { String partition = publisherFactory.getPartition(); try { ((MRSimplerBatchPublisher) publisher).getProps().put("partition", partition); - final Collection<MRPublisher.message> dmaapMessages = new ArrayList<MRPublisher.message>(); + final Collection<MRPublisher.message> dmaapMessages = new ArrayList<>(); for (final String message : messages) { dmaapMessages.add(new MRPublisher.message(partition, message)); } @@ -223,7 +233,7 @@ public class ContextAggregatorProcessor implements Callable<Void> { } /** - * Retries to publish messages or throws the given exception if no retries are left + * Retries to publish messages or throws the given exception if no retries are left. * * @param messages * @param exceptionToThrow @@ -234,8 +244,8 @@ public class ContextAggregatorProcessor implements Callable<Void> { if (retriesRemaining <= 0) { throw exceptionToThrow; } - log.debug(String.format("Retrying to publish messages (%d %s remaining)...", retriesRemaining, - ((retriesRemaining == 1) ? "retry" : "retries"))); + log.info("Retrying to publish messages ({} {} remaining)...", retriesRemaining, + ((retriesRemaining == 1) ? "retry" : "retries")); retriesRemaining--; publish(messages); } @@ -251,21 +261,21 @@ public class ContextAggregatorProcessor implements Callable<Void> { try { return publisher.close(20L, TimeUnit.SECONDS).stream().map(m -> m.fMsg).collect(Collectors.toList()); } catch (Exception e) { - throw new ContextAggregatorException(ContextAggregatorError.PUBLISHER_CLOSE_ERROR, e.getMessage()); + throw new ContextAggregatorException(ContextAggregatorError.PUBLISHER_CLOSE_ERROR, e); } } private List<String> getRequiredTopicList(String messageRouterRequiredPombaTopicList) { - List<String> pombaTopicList = new ArrayList<String>(); + List<String> pombaTopicList = new ArrayList<>(); String noSpacePombaTopicList = messageRouterRequiredPombaTopicList.replaceAll("\\s", ""); String[] pombaTopicStrSet = noSpacePombaTopicList.split(","); for (int i = 0; i < pombaTopicStrSet.length; i++) { - pombaTopicList.add(new String(pombaTopicStrSet[i])); + pombaTopicList.add(pombaTopicStrSet[i]); } return pombaTopicList; } - private void createPombaTopics () { + private void createPombaTopics() { List<String> requiredTopicList = getRequiredTopicList(messageRouterRequiredPombaTopicList); @@ -273,15 +283,16 @@ public class ContextAggregatorProcessor implements Callable<Void> { int partitionCount = 1; int replicationCount = 1; - for ( String topic_required : requiredTopicList) { - try { - messageRouterTopicMgr.createTopic(topic_required, topicDescription, partitionCount, replicationCount); - } catch (HttpException e1) { - log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage())); - } catch (IOException e) { - log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage())); - } - } + for (String topicRequired : requiredTopicList) { + try { + messageRouterTopicMgr.createTopic(topicRequired, topicDescription, partitionCount, replicationCount); + log.info("Created Pomba Topic {}", topicRequired); + } catch (HttpException e1) { + log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e1.getMessage())); + } catch (IOException e) { + log.error(ContextAggregatorError.FAILED_TO_CREATE_POMBA_TOPICS.getMessage(e.getMessage())); + } + } } } |