diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java | 72 |
1 files changed, 23 insertions, 49 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 164f2b16..f98b481f 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -2,8 +2,9 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,11 +25,9 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.io.IOException; import java.net.MalformedURLException; import java.util.UUID; - -import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; +import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; @@ -41,32 +40,36 @@ import org.slf4j.LoggerFactory; * notifying its listeners. */ public abstract class SingleThreadedBusTopicSource extends BusTopicBase - implements Runnable, BusTopicSource, FilterableTopicSource { + implements Runnable, BusTopicSource { /** * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only * that in a single file in a concise format. */ - private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class); + private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class); /** * Bus consumer group. */ + @Getter protected final String consumerGroup; /** * Bus consumer instance. */ + @Getter protected final String consumerInstance; /** * Bus fetch timeout. */ + @Getter protected final int fetchTimeout; /** * Bus fetch limit. */ + @Getter protected final int fetchLimit; /** @@ -87,19 +90,24 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase * * @throws IllegalArgumentException An invalid parameter passed in */ - public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) { + protected SingleThreadedBusTopicSource(BusTopicParams busTopicParams) { super(busTopicParams); - if (busTopicParams.isConsumerGroupInvalid()) { + if (busTopicParams.isConsumerGroupInvalid() && busTopicParams.isConsumerInstanceInvalid()) { this.consumerGroup = UUID.randomUUID().toString(); - } else { + this.consumerInstance = NetworkUtil.getHostname(); + + } else if (busTopicParams.isConsumerGroupInvalid()) { + this.consumerGroup = UUID.randomUUID().toString(); + this.consumerInstance = busTopicParams.getConsumerInstance(); + + } else if (busTopicParams.isConsumerInstanceInvalid()) { this.consumerGroup = busTopicParams.getConsumerGroup(); - } + this.consumerInstance = UUID.randomUUID().toString(); - if (busTopicParams.isConsumerInstanceInvalid()) { - this.consumerInstance = NetworkUtil.getHostname(); } else { + this.consumerGroup = busTopicParams.getConsumerGroup(); this.consumerInstance = busTopicParams.getConsumerInstance(); } @@ -134,8 +142,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase logger.info("{}: register: start not attempted", this); } } catch (Exception e) { - logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(), - e); + logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e); } } @@ -176,8 +183,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase busPollerThread.start(); return true; } catch (Exception e) { - logger.warn("{}: cannot start because of {}", this, e.getMessage(), e); - throw new IllegalStateException(e); + throw new IllegalStateException(this + ": cannot start", e); } } } @@ -227,7 +233,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase try { fetchAllMessages(); } catch (IOException | RuntimeException e) { - logger.error("{}: cannot fetch because of ", this, e.getMessage(), e); + logger.error("{}: cannot fetch", this, e); } } @@ -265,17 +271,6 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase return broadcast(event); } - - @Override - public void setFilter(String filter) { - if (consumer instanceof FilterableBusConsumer) { - ((FilterableBusConsumer) consumer).setFilter(filter); - - } else { - throw new UnsupportedOperationException("no server-side filtering for topic " + topic); - } - } - @Override public String toString() { return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance @@ -285,29 +280,8 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase } @Override - public String getConsumerGroup() { - return consumerGroup; - } - - @Override - public String getConsumerInstance() { - return consumerInstance; - } - - @Override public void shutdown() { this.stop(); this.topicListeners.clear(); } - - @Override - public int getFetchTimeout() { - return fetchTimeout; - } - - @Override - public int getFetchLimit() { - return fetchLimit; - } - } |