diff options
Diffstat (limited to 'feature-pooling-messages')
8 files changed, 54 insertions, 49 deletions
diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java index a1368d8c..25579170 100644 --- a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java @@ -28,10 +28,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import lombok.AccessLevel; import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; +import org.onap.policy.common.message.bus.event.TopicEndpointManager; +import org.onap.policy.common.message.bus.event.TopicSink; +import org.onap.policy.common.message.bus.event.TopicSource; import org.onap.policy.common.utils.properties.SpecProperties; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index fec6ba81..e613787b 100644 --- a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -28,8 +28,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; +import org.onap.policy.common.message.bus.event.TopicListener; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Leader; @@ -419,8 +419,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { * @param topic2 topic * @param event event, as an object * @param eventHashCode event's hash code - * @return {@code true} if the event was handled, {@code false} if the invoker should - * handle it + * @return {@code true} if the event was handled, {@code false} if the invoker should handle it */ private boolean handleExternal(String topic2, Object event, int eventHashCode) { if (assignments == null) { @@ -436,7 +435,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { } /** - * Handles a {@link Forward} event, possibly forwarding it again. + * Handles a forward event, possibly forwarding it again. * * @param topic2 topic * @param event event, as an object diff --git a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java index 4c9b3f34..b2986e65 100644 --- a/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java +++ b/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/TopicMessageManager.java @@ -23,11 +23,11 @@ package org.onap.policy.drools.pooling; import java.util.List; import lombok.Getter; -import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; -import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicListener; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.message.bus.event.TopicEndpoint; +import org.onap.policy.common.message.bus.event.TopicEndpointManager; +import org.onap.policy.common.message.bus.event.TopicListener; +import org.onap.policy.common.message.bus.event.TopicSink; +import org.onap.policy.common.message.bus.event.TopicSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java index 1ad5d513..b487762a 100644 --- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/EndToEndFeatureTest.java @@ -27,6 +27,13 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_KAFKA_SINK_TOPICS; +import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_KAFKA_SOURCE_TOPICS; +import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_MANAGED_SUFFIX; +import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; +import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX; +import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX; +import static org.onap.policy.common.message.bus.properties.MessageBusProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX; import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX; import com.google.gson.Gson; @@ -50,12 +57,11 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; -import org.onap.policy.common.endpoints.event.comm.TopicListener; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; -import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; +import org.onap.policy.common.message.bus.event.TopicEndpointManager; +import org.onap.policy.common.message.bus.event.TopicListener; +import org.onap.policy.common.message.bus.event.TopicSink; +import org.onap.policy.common.message.bus.event.TopicSource; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.system.PolicyController; import org.onap.policy.drools.system.PolicyEngine; @@ -237,14 +243,14 @@ public class EndToEndFeatureTest { private static Properties makeSinkProperties(String topic) { Properties props = new Properties(); - props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS, topic); + props.setProperty(PROPERTY_KAFKA_SINK_TOPICS, topic); - props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); - props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0"); - props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); + props.setProperty(PROPERTY_KAFKA_SINK_TOPICS + "." + topic + + PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); + props.setProperty(PROPERTY_KAFKA_SINK_TOPICS + "." + topic + + PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0"); + props.setProperty(PROPERTY_KAFKA_SINK_TOPICS + "." + topic + + PROPERTY_MANAGED_SUFFIX, "false"); return props; } @@ -252,19 +258,19 @@ public class EndToEndFeatureTest { private static Properties makeSourceProperties(String topic) { Properties props = new Properties(); - props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS, topic); + props.setProperty(PROPERTY_KAFKA_SOURCE_TOPICS, topic); - props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); - props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT); - props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false"); + props.setProperty(PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic + + PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); + props.setProperty(PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic + + PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT); + props.setProperty(PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic + + PROPERTY_MANAGED_SUFFIX, "false"); if (EXTERNAL_TOPIC.equals(topic)) { // consumer group is a constant - props.setProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic - + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP); + props.setProperty(PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic + + PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, EXTERNAL_GROUP); // consumer instance is generated by the BusConsumer code } diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java index f1b48b3e..f4e74375 100644 --- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java @@ -53,11 +53,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.onap.policy.common.endpoints.event.comm.Topic; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicListener; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.message.bus.event.Topic; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; +import org.onap.policy.common.message.bus.event.TopicListener; +import org.onap.policy.common.message.bus.event.TopicSink; +import org.onap.policy.common.message.bus.event.TopicSource; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.system.PolicyController; import org.onap.policy.drools.system.PolicyEngine; diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java index 1eb9e361..f288767d 100644 --- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java @@ -43,9 +43,9 @@ import java.util.concurrent.CountDownLatch; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; +import org.onap.policy.common.message.bus.event.TopicSink; +import org.onap.policy.common.message.bus.event.TopicSource; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.system.PolicyController; import org.onap.policy.drools.system.PolicyEngine; diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index 98300683..65b52eb6 100644 --- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -47,8 +47,8 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; -import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure; +import org.onap.policy.common.message.bus.event.TopicListener; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.pooling.message.BucketAssignments; import org.onap.policy.drools.pooling.message.Heartbeat; diff --git a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/TopicMessageManagerTest.java b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/TopicMessageManagerTest.java index 74098487..82fe3a8f 100644 --- a/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/TopicMessageManagerTest.java +++ b/feature-pooling-messages/src/test/java/org/onap/policy/drools/pooling/TopicMessageManagerTest.java @@ -40,9 +40,9 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.onap.policy.common.endpoints.event.comm.TopicListener; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.message.bus.event.TopicListener; +import org.onap.policy.common.message.bus.event.TopicSink; +import org.onap.policy.common.message.bus.event.TopicSource; class TopicMessageManagerTest { |