aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java72
1 files changed, 23 insertions, 49 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 164f2b16..f98b481f 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -2,8 +2,9 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,11 +25,9 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.UUID;
-
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
+import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
-import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
@@ -41,32 +40,36 @@ import org.slf4j.LoggerFactory;
* notifying its listeners.
*/
public abstract class SingleThreadedBusTopicSource extends BusTopicBase
- implements Runnable, BusTopicSource, FilterableTopicSource {
+ implements Runnable, BusTopicSource {
/**
* Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
* that in a single file in a concise format.
*/
- private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
+ private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class);
/**
* Bus consumer group.
*/
+ @Getter
protected final String consumerGroup;
/**
* Bus consumer instance.
*/
+ @Getter
protected final String consumerInstance;
/**
* Bus fetch timeout.
*/
+ @Getter
protected final int fetchTimeout;
/**
* Bus fetch limit.
*/
+ @Getter
protected final int fetchLimit;
/**
@@ -87,19 +90,24 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
*
* @throws IllegalArgumentException An invalid parameter passed in
*/
- public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
+ protected SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
super(busTopicParams);
- if (busTopicParams.isConsumerGroupInvalid()) {
+ if (busTopicParams.isConsumerGroupInvalid() && busTopicParams.isConsumerInstanceInvalid()) {
this.consumerGroup = UUID.randomUUID().toString();
- } else {
+ this.consumerInstance = NetworkUtil.getHostname();
+
+ } else if (busTopicParams.isConsumerGroupInvalid()) {
+ this.consumerGroup = UUID.randomUUID().toString();
+ this.consumerInstance = busTopicParams.getConsumerInstance();
+
+ } else if (busTopicParams.isConsumerInstanceInvalid()) {
this.consumerGroup = busTopicParams.getConsumerGroup();
- }
+ this.consumerInstance = UUID.randomUUID().toString();
- if (busTopicParams.isConsumerInstanceInvalid()) {
- this.consumerInstance = NetworkUtil.getHostname();
} else {
+ this.consumerGroup = busTopicParams.getConsumerGroup();
this.consumerInstance = busTopicParams.getConsumerInstance();
}
@@ -134,8 +142,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
logger.info("{}: register: start not attempted", this);
}
} catch (Exception e) {
- logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
- e);
+ logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e);
}
}
@@ -176,8 +183,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
busPollerThread.start();
return true;
} catch (Exception e) {
- logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
- throw new IllegalStateException(e);
+ throw new IllegalStateException(this + ": cannot start", e);
}
}
}
@@ -227,7 +233,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
try {
fetchAllMessages();
} catch (IOException | RuntimeException e) {
- logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
+ logger.error("{}: cannot fetch", this, e);
}
}
@@ -265,17 +271,6 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
return broadcast(event);
}
-
- @Override
- public void setFilter(String filter) {
- if (consumer instanceof FilterableBusConsumer) {
- ((FilterableBusConsumer) consumer).setFilter(filter);
-
- } else {
- throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
- }
- }
-
@Override
public String toString() {
return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
@@ -285,29 +280,8 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
}
@Override
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- @Override
- public String getConsumerInstance() {
- return consumerInstance;
- }
-
- @Override
public void shutdown() {
this.stop();
this.topicListeners.clear();
}
-
- @Override
- public int getFetchTimeout() {
- return fetchTimeout;
- }
-
- @Override
- public int getFetchLimit() {
- return fetchLimit;
- }
-
}