aboutsummaryrefslogtreecommitdiffstats
path: root/common/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java')
-rw-r--r--common/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java37
1 files changed, 16 insertions, 21 deletions
diff --git a/common/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java b/common/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java
index 033951612d..6a01fb61ba 100644
--- a/common/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java
+++ b/common/src/main/java/org/openecomp/mso/client/dmaap/DmaapConsumer.java
@@ -20,29 +20,25 @@
package org.openecomp.mso.client.dmaap;
-import java.io.FileNotFoundException;
+import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
-
import org.openecomp.mso.client.dmaap.exceptions.DMaaPConsumerFailure;
import org.openecomp.mso.client.dmaap.exceptions.ExceededMaximumPollingTime;
import org.openecomp.mso.client.dmaap.rest.RestConsumer;
-import com.google.common.base.Stopwatch;
-
public abstract class DmaapConsumer extends DmaapClient {
- public DmaapConsumer() throws FileNotFoundException, IOException {
+ public DmaapConsumer() throws IOException {
super("dmaap/default-consumer.properties");
}
-
- public Consumer getConsumer() throws FileNotFoundException, IOException {
+
+ public Consumer getConsumer() {
return new RestConsumer(this.properties);
}
+
public boolean consume() throws Exception {
-
- Consumer mrConsumer = this.getConsumer();
- int iterations = 0;
+ Consumer mrConsumer = this.getConsumer();
boolean accepted = false;
Stopwatch stopwatch = Stopwatch.createUnstarted();
try {
@@ -59,32 +55,28 @@ public abstract class DmaapConsumer extends DmaapClient {
if (!accepted && this.isAccepted(message)) {
auditLogger.info("accepted message found for " + this.getRequestId() + " on " + this.getTopic());
accepted = true;
- }
+ }
if (accepted) {
+ auditLogger.info("received dmaap message: " + message);
if (this.isFailure(message)) {
this.stopProcessingMessages();
- auditLogger.info("received dmaap message: " + message);
final String errorMsg = "failure received from dmaap topic " + this.getTopic();
auditLogger.error(errorMsg);
throw new DMaaPConsumerFailure(errorMsg);
} else {
- auditLogger.info("received dmaap message: " + message);
this.processMessage(message);
}
}
}
- iterations++;
}
return true;
- } catch (Exception e ) {
- throw e;
} finally {
if (stopwatch.isRunning()) {
stopwatch.stop();
}
}
}
-
+
/**
* Should this consumer continue to consume messages from the topic?
* @return
@@ -92,7 +84,7 @@ public abstract class DmaapConsumer extends DmaapClient {
public abstract boolean continuePolling();
/**
* Process a message from a DMaaP topic
- *
+ *
* @param message
* @throws Exception
*/
@@ -100,14 +92,14 @@ public abstract class DmaapConsumer extends DmaapClient {
/**
* Has the request been accepted by the receiving system?
* Should the consumer move to processing messages?
- *
+ *
* @param message
* @return
*/
public abstract boolean isAccepted(String message);
/**
* has the request failed?
- *
+ *
* @param message
* @return
*/
@@ -121,11 +113,14 @@ public abstract class DmaapConsumer extends DmaapClient {
* Logic that defines when the consumer should stop processing messages
*/
public abstract void stopProcessingMessages();
-
+
/**
* time in milliseconds
*/
public int getMaximumElapsedTime() {
return 180000;
}
+
+
+
}