diff options
Diffstat (limited to 'bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java')
-rw-r--r-- | bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java new file mode 100644 index 0000000000..07ed8719b7 --- /dev/null +++ b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java @@ -0,0 +1,40 @@ +package org.openecomp.mso.client.dmaap; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; + +public class DmaapConsumer { + + private final MRConsumer mrConsumer; + public DmaapConsumer() { + mrConsumer = null; + } + public DmaapConsumer (String filepath) throws FileNotFoundException, IOException { + + mrConsumer = MRClientFactory.createConsumer(filepath); + } + + + public MRConsumer getMRConsumer() { + return mrConsumer; + } + public boolean consume(Consumer consumer) throws Exception { + boolean accepted = false; + while (consumer.continuePolling()) { + for (String message : this.getMRConsumer().fetch()) { + if (!accepted && consumer.isAccepted(message)) { + accepted = true; + } + if (accepted) { + consumer.processMessage(message); + } + } + } + + return true; + } + +} |