aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java75
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java105
2 files changed, 84 insertions, 96 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index 3d939392..f2277be3 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
/**
* A state in the finite state machine.
- *
+ *
* <p>A state may have several timers associated with it, which must be cancelled whenever
* the state is changed. Assumes that timers are not continuously added to the same state.
*/
@@ -63,7 +63,7 @@ public abstract class State {
/**
* Constructor.
- *
+ *
* @param mgr pooling manager
*/
public State(PoolingManager mgr) {
@@ -74,7 +74,7 @@ public abstract class State {
* Gets the server-side filter to use when polling the DMaaP internal topic. The
* default method returns a filter that accepts messages on the admin channel and on
* the host's own channel.
- *
+ *
* @return the server-side filter to use.
*/
@SuppressWarnings("unchecked")
@@ -98,7 +98,7 @@ public abstract class State {
/**
* Transitions to the "start" state.
- *
+ *
* @return the new state
*/
public final State goStart() {
@@ -107,7 +107,7 @@ public abstract class State {
/**
* Transitions to the "query" state.
- *
+ *
* @return the new state
*/
public State goQuery() {
@@ -116,7 +116,7 @@ public abstract class State {
/**
* Goes active with a new set of assignments.
- *
+ *
* @param asgn new assignments
* @return the new state, either Active or Inactive, depending on whether or not this
* host has an assignment
@@ -134,7 +134,7 @@ public abstract class State {
/**
* Transitions to the "inactive" state.
- *
+ *
* @return the new state
*/
protected State goInactive() {
@@ -144,25 +144,26 @@ public abstract class State {
/**
* Processes a message. The default method passes it to the manager to handle and
* returns {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Forward msg) {
- if (!getHost().equals(msg.getChannel())) {
+ if (getHost().equals(msg.getChannel())) {
+ logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
+ mgr.handle(msg);
+
+ } else {
logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
getTopic());
- return null;
}
- logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
- mgr.handle(msg);
return null;
}
/**
* Processes a message. The default method just returns {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
@@ -173,7 +174,7 @@ public abstract class State {
/**
* Processes a message. The default method just returns {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
@@ -185,7 +186,7 @@ public abstract class State {
/**
* Processes a message. The default method copies the assignments and then returns
* {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
@@ -200,7 +201,7 @@ public abstract class State {
/**
* Processes a message. The default method just returns {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
@@ -211,7 +212,7 @@ public abstract class State {
/**
* Processes a message. The default method just returns {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
@@ -222,7 +223,7 @@ public abstract class State {
/**
* Determines if a message is valid and did not originate from this host.
- *
+ *
* @param msg message to be validated
* @return {@code true} if the message is valid, {@code false} otherwise
*/
@@ -252,7 +253,7 @@ public abstract class State {
/**
* Publishes a message.
- *
+ *
* @param msg message to be published
*/
protected final void publish(Identification msg) {
@@ -261,7 +262,7 @@ public abstract class State {
/**
* Publishes a message.
- *
+ *
* @param msg message to be published
*/
protected final void publish(Leader msg) {
@@ -270,7 +271,7 @@ public abstract class State {
/**
* Publishes a message.
- *
+ *
* @param msg message to be published
*/
protected final void publish(Offline msg) {
@@ -279,7 +280,7 @@ public abstract class State {
/**
* Publishes a message.
- *
+ *
* @param msg message to be published
*/
protected final void publish(Query msg) {
@@ -288,7 +289,7 @@ public abstract class State {
/**
* Publishes a message on the specified channel.
- *
+ *
* @param channel channel
* @param msg message to be published
*/
@@ -298,7 +299,7 @@ public abstract class State {
/**
* Publishes a message on the specified channel.
- *
+ *
* @param channel channel
* @param msg message to be published
*/
@@ -308,7 +309,7 @@ public abstract class State {
/**
* Starts distributing messages using the specified bucket assignments.
- *
+ *
* @param assignments assignments
*/
protected final void startDistributing(BucketAssignments assignments) {
@@ -319,7 +320,7 @@ public abstract class State {
/**
* Schedules a timer to fire after a delay.
- *
+ *
* @param delayMs delay in ms
* @param task task
*/
@@ -329,7 +330,7 @@ public abstract class State {
/**
* Schedules a timer to fire repeatedly.
- *
+ *
* @param initialDelayMs initial delay ms
* @param delayMs delay ms
* @param task task
@@ -342,7 +343,7 @@ public abstract class State {
* Indicates that we failed to see our own heartbeat; must be a problem with the
* internal topic. Assumes the problem is temporary and continues to use the current
* bucket assignments.
- *
+ *
* @return a new {@link StartState}
*/
protected final State missedHeartbeat() {
@@ -354,7 +355,7 @@ public abstract class State {
/**
* Indicates that the internal topic failed; this should only be invoked from the
* StartState. Discards bucket assignments and begins processing everything locally.
- *
+ *
* @return a new {@link InactiveState}
*/
protected final State internalTopicFailed() {
@@ -366,9 +367,9 @@ public abstract class State {
/**
* Makes a heart beat message.
- *
+ *
* @param timestampMs time, in milliseconds, associated with the message
- *
+ *
* @return a new message
*/
protected final Heartbeat makeHeartbeat(long timestampMs) {
@@ -377,7 +378,7 @@ public abstract class State {
/**
* Makes an Identification message.
- *
+ *
* @return a new message
*/
protected Identification makeIdentification() {
@@ -386,7 +387,7 @@ public abstract class State {
/**
* Makes an "offline" message.
- *
+ *
* @return a new message
*/
protected final Offline makeOffline() {
@@ -395,7 +396,7 @@ public abstract class State {
/**
* Makes a query message.
- *
+ *
* @return a new message
*/
protected final Query makeQuery() {
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index bd3d90b6..2a0066b7 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -640,27 +640,17 @@ public class PoolingManagerImplTest {
@Test
public void testBeforeInsert_NoIntercept() throws Exception {
- startMgr();
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ validateUnhandled(CommInfrastructure.UEB);
}
@Test
public void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception {
- startMgr();
-
- when(extractors.extract(any())).thenReturn(null);
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ validateHandleReqId(null);
}
@Test
public void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception {
- startMgr();
-
- when(extractors.extract(any())).thenReturn("");
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ validateHandleReqId("");
}
@Test
@@ -672,50 +662,28 @@ public class PoolingManagerImplTest {
@Test
public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
- startMgr();
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ validateUnhandled(CommInfrastructure.UEB);
}
@Test
public void testHandleExternalForward_NoAssignments() throws Exception {
- startMgr();
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ validateUnhandled(CommInfrastructure.UEB);
}
@Test
public void testHandleExternalForward() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ validateNoForward();
}
@Test
public void testHandleEvent_NullTarget() throws Exception {
- startMgr();
-
// buckets have null targets
- mgr.startDistributing(new BucketAssignments(new String[] {null, null}));
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- verify(dmaap, times(START_PUB)).publish(any());
+ validateHandled(new BucketAssignments(new String[] {null, null}), START_PUB);
}
@Test
public void testHandleEvent_SameHost() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- verify(dmaap, times(START_PUB)).publish(any());
+ validateNoForward();
}
@Test
@@ -736,14 +704,7 @@ public class PoolingManagerImplTest {
@Test
public void testHandleEvent_DiffHost_Forward() throws Exception {
- startMgr();
-
- // route the message to the *OTHER* host
- mgr.startDistributing(makeAssignments(false));
-
- assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
- verify(dmaap, times(START_PUB + 1)).publish(any());
+ validateHandled(makeAssignments(false), START_PUB + 1);
}
@Test
@@ -755,11 +716,7 @@ public class PoolingManagerImplTest {
@Test
public void testExtractRequestId_NullReqId() throws Exception {
- startMgr();
-
- when(extractors.extract(any())).thenReturn(null);
-
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ validateHandleReqId(null);
}
@Test
@@ -1009,12 +966,7 @@ public class PoolingManagerImplTest {
@Test
public void testStartDistributing() throws Exception {
- startMgr();
-
- // route the message to this host
- mgr.startDistributing(makeAssignments(true));
- assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
- verify(dmaap, times(START_PUB)).publish(any());
+ validateNoForward();
// null assignments should cause message to be processed locally
@@ -1133,6 +1085,41 @@ public class PoolingManagerImplTest {
assertEquals(1, latch.getCount());
}
+ private void validateHandleReqId(String requestId) throws PoolingFeatureException {
+ startMgr();
+
+ when(extractors.extract(any())).thenReturn(requestId);
+
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+ }
+
+ private void validateNoForward() throws PoolingFeatureException {
+ startMgr();
+
+ // route the message to this host
+ mgr.startDistributing(makeAssignments(true));
+
+ assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ verify(dmaap, times(START_PUB)).publish(any());
+ }
+
+ private void validateHandled(BucketAssignments assignments, int publishCount) throws PoolingFeatureException {
+ startMgr();
+
+ // route the message to the *OTHER* host
+ mgr.startDistributing(assignments);
+
+ assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+
+ verify(dmaap, times(publishCount)).publish(any());
+ }
+
+ private void validateUnhandled(CommInfrastructure infra) throws PoolingFeatureException {
+ startMgr();
+ assertFalse(mgr.beforeInsert(infra, TOPIC2, THE_EVENT, DECODED_EVENT));
+ }
+
/**
* Configure the mock controller to act like a real controller, invoking beforeOffer
* and then beforeInsert, so we can make sure they pass through. We'll keep count to