diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap')
-rw-r--r-- | feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java | 75 |
1 files changed, 38 insertions, 37 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java index 3d939392..f2277be3 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory; /** * A state in the finite state machine. - * + * * <p>A state may have several timers associated with it, which must be cancelled whenever * the state is changed. Assumes that timers are not continuously added to the same state. */ @@ -63,7 +63,7 @@ public abstract class State { /** * Constructor. - * + * * @param mgr pooling manager */ public State(PoolingManager mgr) { @@ -74,7 +74,7 @@ public abstract class State { * Gets the server-side filter to use when polling the DMaaP internal topic. The * default method returns a filter that accepts messages on the admin channel and on * the host's own channel. - * + * * @return the server-side filter to use. */ @SuppressWarnings("unchecked") @@ -98,7 +98,7 @@ public abstract class State { /** * Transitions to the "start" state. - * + * * @return the new state */ public final State goStart() { @@ -107,7 +107,7 @@ public abstract class State { /** * Transitions to the "query" state. - * + * * @return the new state */ public State goQuery() { @@ -116,7 +116,7 @@ public abstract class State { /** * Goes active with a new set of assignments. - * + * * @param asgn new assignments * @return the new state, either Active or Inactive, depending on whether or not this * host has an assignment @@ -134,7 +134,7 @@ public abstract class State { /** * Transitions to the "inactive" state. - * + * * @return the new state */ protected State goInactive() { @@ -144,25 +144,26 @@ public abstract class State { /** * Processes a message. The default method passes it to the manager to handle and * returns {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ public State process(Forward msg) { - if (!getHost().equals(msg.getChannel())) { + if (getHost().equals(msg.getChannel())) { + logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic()); + mgr.handle(msg); + + } else { logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(), getTopic()); - return null; } - logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic()); - mgr.handle(msg); return null; } /** * Processes a message. The default method just returns {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ @@ -173,7 +174,7 @@ public abstract class State { /** * Processes a message. The default method just returns {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ @@ -185,7 +186,7 @@ public abstract class State { /** * Processes a message. The default method copies the assignments and then returns * {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ @@ -200,7 +201,7 @@ public abstract class State { /** * Processes a message. The default method just returns {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ @@ -211,7 +212,7 @@ public abstract class State { /** * Processes a message. The default method just returns {@code null}. - * + * * @param msg message to be processed * @return the new state, or {@code null} if the state is unchanged */ @@ -222,7 +223,7 @@ public abstract class State { /** * Determines if a message is valid and did not originate from this host. - * + * * @param msg message to be validated * @return {@code true} if the message is valid, {@code false} otherwise */ @@ -252,7 +253,7 @@ public abstract class State { /** * Publishes a message. - * + * * @param msg message to be published */ protected final void publish(Identification msg) { @@ -261,7 +262,7 @@ public abstract class State { /** * Publishes a message. - * + * * @param msg message to be published */ protected final void publish(Leader msg) { @@ -270,7 +271,7 @@ public abstract class State { /** * Publishes a message. - * + * * @param msg message to be published */ protected final void publish(Offline msg) { @@ -279,7 +280,7 @@ public abstract class State { /** * Publishes a message. - * + * * @param msg message to be published */ protected final void publish(Query msg) { @@ -288,7 +289,7 @@ public abstract class State { /** * Publishes a message on the specified channel. - * + * * @param channel channel * @param msg message to be published */ @@ -298,7 +299,7 @@ public abstract class State { /** * Publishes a message on the specified channel. - * + * * @param channel channel * @param msg message to be published */ @@ -308,7 +309,7 @@ public abstract class State { /** * Starts distributing messages using the specified bucket assignments. - * + * * @param assignments assignments */ protected final void startDistributing(BucketAssignments assignments) { @@ -319,7 +320,7 @@ public abstract class State { /** * Schedules a timer to fire after a delay. - * + * * @param delayMs delay in ms * @param task task */ @@ -329,7 +330,7 @@ public abstract class State { /** * Schedules a timer to fire repeatedly. - * + * * @param initialDelayMs initial delay ms * @param delayMs delay ms * @param task task @@ -342,7 +343,7 @@ public abstract class State { * Indicates that we failed to see our own heartbeat; must be a problem with the * internal topic. Assumes the problem is temporary and continues to use the current * bucket assignments. - * + * * @return a new {@link StartState} */ protected final State missedHeartbeat() { @@ -354,7 +355,7 @@ public abstract class State { /** * Indicates that the internal topic failed; this should only be invoked from the * StartState. Discards bucket assignments and begins processing everything locally. - * + * * @return a new {@link InactiveState} */ protected final State internalTopicFailed() { @@ -366,9 +367,9 @@ public abstract class State { /** * Makes a heart beat message. - * + * * @param timestampMs time, in milliseconds, associated with the message - * + * * @return a new message */ protected final Heartbeat makeHeartbeat(long timestampMs) { @@ -377,7 +378,7 @@ public abstract class State { /** * Makes an Identification message. - * + * * @return a new message */ protected Identification makeIdentification() { @@ -386,7 +387,7 @@ public abstract class State { /** * Makes an "offline" message. - * + * * @return a new message */ protected final Offline makeOffline() { @@ -395,7 +396,7 @@ public abstract class State { /** * Makes a query message. - * + * * @return a new message */ protected final Query makeQuery() { |