aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java31
1 files changed, 16 insertions, 15 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
index 6122c5d0..edb03bba 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -134,16 +134,12 @@ public interface BusConsumer {
}
}
- /**
- * {@inheritDoc}
- */
+ @Override
public Iterable<String> fetch() throws IOException {
return this.consumer.fetch();
}
- /**
- * {@inheritDoc}
- */
+ @Override
public void close() {
this.consumer.close();
}
@@ -159,9 +155,19 @@ public interface BusConsumer {
*/
public abstract class DmaapConsumerWrapper implements BusConsumer {
+ /**
+ * logger
+ */
private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
+ /**
+ * fetch timeout
+ */
protected int fetchTimeout;
+
+ /**
+ * close condition
+ */
protected Object closeCondition = new Object();
/**
@@ -206,9 +212,6 @@ public interface BusConsumer {
this.consumer.setPassword(password);
}
- /**
- * {@inheritDoc}
- */
@Override
public Iterable<String> fetch() throws InterruptedException, IOException {
MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
@@ -218,7 +221,7 @@ public interface BusConsumer {
synchronized (closeCondition) {
closeCondition.wait(fetchTimeout);
}
- return new ArrayList<String>();
+ return new ArrayList<>();
} else {
logger.debug("DMaaP consumer received {} : {}" +
response.getResponseCode(),
@@ -234,19 +237,17 @@ public interface BusConsumer {
synchronized (closeCondition) {
closeCondition.wait(fetchTimeout);
}
+
/* fall through */
}
}
if (response.getActualMessages() == null)
- return new ArrayList<String>();
+ return new ArrayList<>();
else
return response.getActualMessages();
}
- /**
- * {@inheritDoc}
- */
@Override
public void close() {
synchronized (closeCondition) {
@@ -308,7 +309,7 @@ public interface BusConsumer {
// super constructor sets servers = {""} if empty to avoid errors when using DME2
if ((servers.size() == 1 && servers.get(0).equals("")) ||
- (servers == null) || (servers.size() == 0)) {
+ (servers == null) || (servers.isEmpty())) {
throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
}