diff options
Diffstat (limited to 'bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap')
3 files changed, 104 insertions, 0 deletions
diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/Consumer.java b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/Consumer.java new file mode 100644 index 0000000000..6c0b3ac633 --- /dev/null +++ b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/Consumer.java @@ -0,0 +1,34 @@ +package org.openecomp.mso.client.dmaap; + +public interface Consumer { + + /** + * Should this consumer continue to consume messages from the topic? + * @return + */ + public boolean continuePolling(); + /** + * Process a message from a DMaaP topic + * + * @param message + * @throws Exception + */ + public void processMessage(String message) throws Exception; + /** + * Has the request been accepted by the receiving system? + * Should the consumer move to processing messages? + * + * @param message + * @return + */ + public boolean isAccepted(String message); + /** + * The request id to filter messages on + * @return + */ + public String getRequestId(); + /** + * Logic that defines when the consumer should stop processing messages + */ + public void stopProcessingMessages(); +} diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java new file mode 100644 index 0000000000..07ed8719b7 --- /dev/null +++ b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java @@ -0,0 +1,40 @@ +package org.openecomp.mso.client.dmaap; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; + +public class DmaapConsumer { + + private final MRConsumer mrConsumer; + public DmaapConsumer() { + mrConsumer = null; + } + public DmaapConsumer (String filepath) throws FileNotFoundException, IOException { + + mrConsumer = MRClientFactory.createConsumer(filepath); + } + + + public MRConsumer getMRConsumer() { + return mrConsumer; + } + public boolean consume(Consumer consumer) throws Exception { + boolean accepted = false; + while (consumer.continuePolling()) { + for (String message : this.getMRConsumer().fetch()) { + if (!accepted && consumer.isAccepted(message)) { + accepted = true; + } + if (accepted) { + consumer.processMessage(message); + } + } + } + + return true; + } + +} diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapPublisher.java b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapPublisher.java new file mode 100644 index 0000000000..bce3e6c3d7 --- /dev/null +++ b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/client/dmaap/DmaapPublisher.java @@ -0,0 +1,30 @@ +package org.openecomp.mso.client.dmaap; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; + +public class DmaapPublisher { + + private final long seconds; + private final MRBatchingPublisher publisher; + + public DmaapPublisher(String filepath) throws FileNotFoundException, IOException { + this.seconds = 20; + this.publisher = MRClientFactory.createBatchingPublisher(filepath); + } + + public DmaapPublisher(String filepath, long seconds) throws FileNotFoundException, IOException { + this.seconds = seconds; + this.publisher = MRClientFactory.createBatchingPublisher(filepath); + } + + public void send(String json) throws IOException, InterruptedException { + publisher.send(json); + publisher.close(seconds, TimeUnit.SECONDS); + } + +} |