diff options
Diffstat (limited to 'bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java')
-rw-r--r-- | bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java | 117 |
1 files changed, 94 insertions, 23 deletions
diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java index 2a618763ff..033951612d 100644 --- a/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java +++ b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java @@ -22,39 +22,110 @@ package org.openecomp.mso.client.dmaap; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.concurrent.TimeUnit; -import com.att.nsa.mr.client.MRClientFactory; -import com.att.nsa.mr.client.MRConsumer; +import org.openecomp.mso.client.dmaap.exceptions.DMaaPConsumerFailure; +import org.openecomp.mso.client.dmaap.exceptions.ExceededMaximumPollingTime; +import org.openecomp.mso.client.dmaap.rest.RestConsumer; -public class DmaapConsumer { +import com.google.common.base.Stopwatch; - private final MRConsumer mrConsumer; - public DmaapConsumer() { - mrConsumer = null; - } - public DmaapConsumer (String filepath) throws FileNotFoundException, IOException { - - mrConsumer = MRClientFactory.createConsumer(filepath); +public abstract class DmaapConsumer extends DmaapClient { + + public DmaapConsumer() throws FileNotFoundException, IOException { + super("dmaap/default-consumer.properties"); } - - public MRConsumer getMRConsumer() { - return mrConsumer; + public Consumer getConsumer() throws FileNotFoundException, IOException { + return new RestConsumer(this.properties); } - public boolean consume(Consumer consumer) throws Exception { + public boolean consume() throws Exception { + + Consumer mrConsumer = this.getConsumer(); + int iterations = 0; boolean accepted = false; - while (consumer.continuePolling()) { - for (String message : this.getMRConsumer().fetch()) { - if (!accepted && consumer.isAccepted(message)) { - accepted = true; - } - if (accepted) { - consumer.processMessage(message); + Stopwatch stopwatch = Stopwatch.createUnstarted(); + try { + while (this.continuePolling()) { + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) >= this.getMaximumElapsedTime()) { + final String message = "exceeded maximum retries on " + this.getRequestId() + " on " + this.getTopic(); + auditLogger.error(message); + throw new ExceededMaximumPollingTime(message); } + stopwatch.start(); + Iterable<String> itr = mrConsumer.fetch(); + stopwatch.stop(); + for (String message : itr) { + if (!accepted && this.isAccepted(message)) { + auditLogger.info("accepted message found for " + this.getRequestId() + " on " + this.getTopic()); + accepted = true; + } + if (accepted) { + if (this.isFailure(message)) { + this.stopProcessingMessages(); + auditLogger.info("received dmaap message: " + message); + final String errorMsg = "failure received from dmaap topic " + this.getTopic(); + auditLogger.error(errorMsg); + throw new DMaaPConsumerFailure(errorMsg); + } else { + auditLogger.info("received dmaap message: " + message); + this.processMessage(message); + } + } + } + iterations++; + } + return true; + } catch (Exception e ) { + throw e; + } finally { + if (stopwatch.isRunning()) { + stopwatch.stop(); } } - - return true; } + /** + * Should this consumer continue to consume messages from the topic? + * @return + */ + public abstract boolean continuePolling(); + /** + * Process a message from a DMaaP topic + * + * @param message + * @throws Exception + */ + public abstract void processMessage(String message) throws Exception; + /** + * Has the request been accepted by the receiving system? + * Should the consumer move to processing messages? + * + * @param message + * @return + */ + public abstract boolean isAccepted(String message); + /** + * has the request failed? + * + * @param message + * @return + */ + public abstract boolean isFailure(String message); + /** + * The request id to filter messages on + * @return + */ + public abstract String getRequestId(); + /** + * Logic that defines when the consumer should stop processing messages + */ + public abstract void stopProcessingMessages(); + + /** + * time in milliseconds + */ + public int getMaximumElapsedTime() { + return 180000; + } } |