diff options
Diffstat (limited to 'feature-pooling-dmaap/src')
2 files changed, 84 insertions, 96 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index 3d939392..f2277be3 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory; /** * A state in the finite state machine. - * + * * <p>A state may have several timers associated with it, which must be cancelled whenever * the state is changed. Assumes that timers are not continuously added to the same state. */ @@ -63,7 +63,7 @@ public abstract class State { /** * Constructor. - * + * * @param mgr pooling manager */ public State(PoolingManager mgr) { @@ -74,7 +74,7 @@ public abstract class State { * Gets the server-side filter to use when polling the DMaaP internal topic. The * default method returns a filter that accepts messages on the admin channel and on * the host's own channel. - * + * * @return the server-side filter to use. */ @SuppressWarnings("unchecked") @@ -98,7 +98,7 @@ public abstract class State { /** * Transitions to the "start" state. - * + * * @return the new state */ public final State goStart() { @@ -107,7 +107,7 @@ public abstract class State { /** * Transitions to the "query" state. - * + * * @return the new state */ public State goQuery() { @@ -116,7 +116,7 @@ public abstract class State { /** * Goes active with a new set of assignments. - * + * * @param asgn new assignments * @return the new state, either Active or Inactive, depending on whether or not this * host has an assignment @@ -134,7 +134,7 @@ public abstract class State { /** * Transitions to the "inactive" state. - * + * * @return the new state */ protected State goInactive() { @@ -144,25 +144,26 @@ public abstract class State { /** * Processes a message. The default method passes it to the manager to handle and * returns {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ public State process(Forward msg) { - if (!getHost().equals(msg.getChannel())) { + if (getHost().equals(msg.getChannel())) { + logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic()); + mgr.handle(msg); + + } else { logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(), getTopic()); - return null; } - logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic()); - mgr.handle(msg); return null; } /** * Processes a message. The default method just returns {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ @@ -173,7 +174,7 @@ public abstract class State { /** * Processes a message. The default method just returns {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ @@ -185,7 +186,7 @@ public abstract class State { /** * Processes a message. The default method copies the assignments and then returns * {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ @@ -200,7 +201,7 @@ public abstract class State { /** * Processes a message. The default method just returns {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ @@ -211,7 +212,7 @@ public abstract class State { /** * Processes a message. The default method just returns {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ @@ -222,7 +223,7 @@ public abstract class State { /** * Determines if a message is valid and did not originate from this host. - * + * * @param msg message to be validated * @return {@code true} if the message is valid, {@code false} otherwise */ @@ -252,7 +253,7 @@ public abstract class State { /** * Publishes a message. - * + * * @param msg message to be published */ protected final void publish(Identification msg) { @@ -261,7 +262,7 @@ public abstract class State { /** * Publishes a message. - * + * * @param msg message to be published */ protected final void publish(Leader msg) { @@ -270,7 +271,7 @@ public abstract class State { /** * Publishes a message. - * + * * @param msg message to be published */ protected final void publish(Offline msg) { @@ -279,7 +280,7 @@ public abstract class State { /** * Publishes a message. - * + * * @param msg message to be published */ protected final void publish(Query msg) { @@ -288,7 +289,7 @@ public abstract class State { /** * Publishes a message on the specified channel. - * + * * @param channel channel * @param msg message to be published */ @@ -298,7 +299,7 @@ public abstract class State { /** * Publishes a message on the specified channel. - * + * * @param channel channel * @param msg message to be published */ @@ -308,7 +309,7 @@ public abstract class State { /** * Starts distributing messages using the specified bucket assignments. - * + * * @param assignments assignments */ protected final void startDistributing(BucketAssignments assignments) { @@ -319,7 +320,7 @@ public abstract class State { /** * Schedules a timer to fire after a delay. - * + * * @param delayMs delay in ms * @param task task */ @@ -329,7 +330,7 @@ public abstract class State { /** * Schedules a timer to fire repeatedly. - * + * * @param initialDelayMs initial delay ms * @param delayMs delay ms * @param task task @@ -342,7 +343,7 @@ public abstract class State { * Indicates that we failed to see our own heartbeat; must be a problem with the * internal topic. Assumes the problem is temporary and continues to use the current * bucket assignments. - * + * * @return a new {@link StartState} */ protected final State missedHeartbeat() { @@ -354,7 +355,7 @@ public abstract class State { /** * Indicates that the internal topic failed; this should only be invoked from the * StartState. Discards bucket assignments and begins processing everything locally. - * + * * @return a new {@link InactiveState} */ protected final State internalTopicFailed() { @@ -366,9 +367,9 @@ public abstract class State { /** * Makes a heart beat message. - * + * * @param timestampMs time, in milliseconds, associated with the message - * + * * @return a new message */ protected final Heartbeat makeHeartbeat(long timestampMs) { @@ -377,7 +378,7 @@ public abstract class State { /** * Makes an Identification message. - * + * * @return a new message */ protected Identification makeIdentification() { @@ -386,7 +387,7 @@ public abstract class State { /** * Makes an "offline" message. - * + * * @return a new message */ protected final Offline makeOffline() { @@ -395,7 +396,7 @@ public abstract class State { /** * Makes a query message. - * + * * @return a new message */ protected final Query makeQuery() { diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java index bd3d90b6..2a0066b7 100644 --- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java +++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java @@ -640,27 +640,17 @@ public class PoolingManagerImplTest { @Test public void testBeforeInsert_NoIntercept() throws Exception { - startMgr(); - - assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + validateUnhandled(CommInfrastructure.UEB); } @Test public void testHandleExternalCommInfrastructureStringStringString_NullReqId() throws Exception { - startMgr(); - - when(extractors.extract(any())).thenReturn(null); - - assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + validateHandleReqId(null); } @Test public void testHandleExternalCommInfrastructureStringStringString_EmptyReqId() throws Exception { - startMgr(); - - when(extractors.extract(any())).thenReturn(""); - - assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + validateHandleReqId(""); } @Test @@ -672,50 +662,28 @@ public class PoolingManagerImplTest { @Test public void testHandleExternalCommInfrastructureStringStringString() throws Exception { - startMgr(); - - assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + validateUnhandled(CommInfrastructure.UEB); } @Test public void testHandleExternalForward_NoAssignments() throws Exception { - startMgr(); - - assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + validateUnhandled(CommInfrastructure.UEB); } @Test public void testHandleExternalForward() throws Exception { - startMgr(); - - // route the message to this host - mgr.startDistributing(makeAssignments(true)); - - assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + validateNoForward(); } @Test public void testHandleEvent_NullTarget() throws Exception { - startMgr(); - // buckets have null targets - mgr.startDistributing(new BucketAssignments(new String[] {null, null})); - - assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - verify(dmaap, times(START_PUB)).publish(any()); + validateHandled(new BucketAssignments(new String[] {null, null}), START_PUB); } @Test public void testHandleEvent_SameHost() throws Exception { - startMgr(); - - // route the message to this host - mgr.startDistributing(makeAssignments(true)); - - assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - verify(dmaap, times(START_PUB)).publish(any()); + validateNoForward(); } @Test @@ -736,14 +704,7 @@ public class PoolingManagerImplTest { @Test public void testHandleEvent_DiffHost_Forward() throws Exception { - startMgr(); - - // route the message to the *OTHER* host - mgr.startDistributing(makeAssignments(false)); - - assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - - verify(dmaap, times(START_PUB + 1)).publish(any()); + validateHandled(makeAssignments(false), START_PUB + 1); } @Test @@ -755,11 +716,7 @@ public class PoolingManagerImplTest { @Test public void testExtractRequestId_NullReqId() throws Exception { - startMgr(); - - when(extractors.extract(any())).thenReturn(null); - - assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + validateHandleReqId(null); } @Test @@ -1009,12 +966,7 @@ public class PoolingManagerImplTest { @Test public void testStartDistributing() throws Exception { - startMgr(); - - // route the message to this host - mgr.startDistributing(makeAssignments(true)); - assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); - verify(dmaap, times(START_PUB)).publish(any()); + validateNoForward(); // null assignments should cause message to be processed locally @@ -1133,6 +1085,41 @@ public class PoolingManagerImplTest { assertEquals(1, latch.getCount()); } + private void validateHandleReqId(String requestId) throws PoolingFeatureException { + startMgr(); + + when(extractors.extract(any())).thenReturn(requestId); + + assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + } + + private void validateNoForward() throws PoolingFeatureException { + startMgr(); + + // route the message to this host + mgr.startDistributing(makeAssignments(true)); + + assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + + verify(dmaap, times(START_PUB)).publish(any()); + } + + private void validateHandled(BucketAssignments assignments, int publishCount) throws PoolingFeatureException { + startMgr(); + + // route the message to the *OTHER* host + mgr.startDistributing(assignments); + + assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT)); + + verify(dmaap, times(publishCount)).publish(any()); + } + + private void validateUnhandled(CommInfrastructure infra) throws PoolingFeatureException { + startMgr(); + assertFalse(mgr.beforeInsert(infra, TOPIC2, THE_EVENT, DECODED_EVENT)); + } + /** * Configure the mock controller to act like a real controller, invoking beforeOffer * and then beforeInsert, so we can make sure they pass through. We'll keep count to |