summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java42
1 files changed, 23 insertions, 19 deletions
diff --git a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java
index df51861b8..5c92a008f 100644
--- a/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java
+++ b/appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/UEBMessagingService.java
@@ -26,25 +26,30 @@ package org.onap.appc.client.impl.protocol;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
import java.io.IOException;
import java.security.GeneralSecurityException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
class UEBMessagingService implements MessagingService {
- private Consumer consumer;
- private Producer producer;
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(UEBMessagingService.class);
- private final String DEFAULT_READ_TIMEOUT_MS = "60000";
- private final String DEFAULT_READ_LIMIT = "1000";
+ private static final String DEFAULT_READ_TIMEOUT_MS = "60000";
+ private static final String DEFAULT_READ_LIMIT = "1000";
+ private Consumer consumer;
+ private Producer producer;
private int readLimit;
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(UEBMessagingService.class);
-
+ @Override
@SuppressWarnings("Since15")
- public void init(Properties props) throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
+ public void init(Properties props)
+ throws IOException, GeneralSecurityException, NoSuchFieldException, IllegalAccessException {
if (props != null) {
String readTopic = props.getProperty(UEBPropertiesKeys.TOPIC_READ);
@@ -56,39 +61,38 @@ class UEBMessagingService implements MessagingService {
String readLimitString = props.getProperty(UEBPropertiesKeys.READ_LIMIT, DEFAULT_READ_LIMIT);
readLimit = Integer.parseInt(readLimitString);
//get hosts pool
- Collection<String> pool = new HashSet<String>();
+ Collection<String> pool = new HashSet<>();
String hostNames = props.getProperty(UEBPropertiesKeys.HOSTS);
if (hostNames != null && !hostNames.isEmpty()) {
- for (String name : hostNames.split(",")) {
- pool.add(name);
- }
+ pool.addAll(Arrays.asList(hostNames.split(",")));
}
-
//generate consumer id and group - same value for both
String consumerName = UUID.randomUUID().toString();
- String consumerID = consumerName;
//create consumer and producer
- consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerID, readTimeout, apiKey, apiSecret);
+ consumer = new ConsumerImpl(pool, readTopic, consumerName, consumerName, readTimeout, apiKey, apiSecret);
producer = new ProducerImpl(pool, writeTopic, apiKey, apiSecret);
//initial consumer registration
try {
consumer.registerForRead();
- }catch(Exception e){
- LOG.error("Message consumer failed to register client "+consumerID);
+ } catch (Exception e) {
+ logger.error("Message consumer failed to register client " + consumerName, e);
}
}
}
+ @Override
public void send(String partition, String body) throws IOException {
producer.post(partition, body);
}
+ @Override
public List<String> fetch() throws IOException {
return consumer.fetch(readLimit);
}
+ @Override
public List<String> fetch(int limit) throws IOException {
return consumer.fetch(limit);
}
@@ -99,4 +103,4 @@ class UEBMessagingService implements MessagingService {
producer.close();
}
-}
+} \ No newline at end of file