summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main/java/org/onap
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main/java/org/onap')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java75
1 files changed, 38 insertions, 37 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index 3d939392..f2277be3 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -2,14 +2,14 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
/**
* A state in the finite state machine.
- *
+ *
* <p>A state may have several timers associated with it, which must be cancelled whenever
* the state is changed. Assumes that timers are not continuously added to the same state.
*/
@@ -63,7 +63,7 @@ public abstract class State {
/**
* Constructor.
- *
+ *
* @param mgr pooling manager
*/
public State(PoolingManager mgr) {
@@ -74,7 +74,7 @@ public abstract class State {
* Gets the server-side filter to use when polling the DMaaP internal topic. The
* default method returns a filter that accepts messages on the admin channel and on
* the host's own channel.
- *
+ *
* @return the server-side filter to use.
*/
@SuppressWarnings("unchecked")
@@ -98,7 +98,7 @@ public abstract class State {
/**
* Transitions to the "start" state.
- *
+ *
* @return the new state
*/
public final State goStart() {
@@ -107,7 +107,7 @@ public abstract class State {
/**
* Transitions to the "query" state.
- *
+ *
* @return the new state
*/
public State goQuery() {
@@ -116,7 +116,7 @@ public abstract class State {
/**
* Goes active with a new set of assignments.
- *
+ *
* @param asgn new assignments
* @return the new state, either Active or Inactive, depending on whether or not this
* host has an assignment
@@ -134,7 +134,7 @@ public abstract class State {
/**
* Transitions to the "inactive" state.
- *
+ *
* @return the new state
*/
protected State goInactive() {
@@ -144,25 +144,26 @@ public abstract class State {
/**
* Processes a message. The default method passes it to the manager to handle and
* returns {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
public State process(Forward msg) {
- if (!getHost().equals(msg.getChannel())) {
+ if (getHost().equals(msg.getChannel())) {
+ logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
+ mgr.handle(msg);
+
+ } else {
logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
getTopic());
- return null;
}
- logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
- mgr.handle(msg);
return null;
}
/**
* Processes a message. The default method just returns {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
@@ -173,7 +174,7 @@ public abstract class State {
/**
* Processes a message. The default method just returns {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
@@ -185,7 +186,7 @@ public abstract class State {
/**
* Processes a message. The default method copies the assignments and then returns
* {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
@@ -200,7 +201,7 @@ public abstract class State {
/**
* Processes a message. The default method just returns {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
@@ -211,7 +212,7 @@ public abstract class State {
/**
* Processes a message. The default method just returns {@code null}.
- *
+ *
* @param msg message to be processed
* @return the new state, or {@code null} if the state is unchanged
*/
@@ -222,7 +223,7 @@ public abstract class State {
/**
* Determines if a message is valid and did not originate from this host.
- *
+ *
* @param msg message to be validated
* @return {@code true} if the message is valid, {@code false} otherwise
*/
@@ -252,7 +253,7 @@ public abstract class State {
/**
* Publishes a message.
- *
+ *
* @param msg message to be published
*/
protected final void publish(Identification msg) {
@@ -261,7 +262,7 @@ public abstract class State {
/**
* Publishes a message.
- *
+ *
* @param msg message to be published
*/
protected final void publish(Leader msg) {
@@ -270,7 +271,7 @@ public abstract class State {
/**
* Publishes a message.
- *
+ *
* @param msg message to be published
*/
protected final void publish(Offline msg) {
@@ -279,7 +280,7 @@ public abstract class State {
/**
* Publishes a message.
- *
+ *
* @param msg message to be published
*/
protected final void publish(Query msg) {
@@ -288,7 +289,7 @@ public abstract class State {
/**
* Publishes a message on the specified channel.
- *
+ *
* @param channel channel
* @param msg message to be published
*/
@@ -298,7 +299,7 @@ public abstract class State {
/**
* Publishes a message on the specified channel.
- *
+ *
* @param channel channel
* @param msg message to be published
*/
@@ -308,7 +309,7 @@ public abstract class State {
/**
* Starts distributing messages using the specified bucket assignments.
- *
+ *
* @param assignments assignments
*/
protected final void startDistributing(BucketAssignments assignments) {
@@ -319,7 +320,7 @@ public abstract class State {
/**
* Schedules a timer to fire after a delay.
- *
+ *
* @param delayMs delay in ms
* @param task task
*/
@@ -329,7 +330,7 @@ public abstract class State {
/**
* Schedules a timer to fire repeatedly.
- *
+ *
* @param initialDelayMs initial delay ms
* @param delayMs delay ms
* @param task task
@@ -342,7 +343,7 @@ public abstract class State {
* Indicates that we failed to see our own heartbeat; must be a problem with the
* internal topic. Assumes the problem is temporary and continues to use the current
* bucket assignments.
- *
+ *
* @return a new {@link StartState}
*/
protected final State missedHeartbeat() {
@@ -354,7 +355,7 @@ public abstract class State {
/**
* Indicates that the internal topic failed; this should only be invoked from the
* StartState. Discards bucket assignments and begins processing everything locally.
- *
+ *
* @return a new {@link InactiveState}
*/
protected final State internalTopicFailed() {
@@ -366,9 +367,9 @@ public abstract class State {
/**
* Makes a heart beat message.
- *
+ *
* @param timestampMs time, in milliseconds, associated with the message
- *
+ *
* @return a new message
*/
protected final Heartbeat makeHeartbeat(long timestampMs) {
@@ -377,7 +378,7 @@ public abstract class State {
/**
* Makes an Identification message.
- *
+ *
* @return a new message
*/
protected Identification makeIdentification() {
@@ -386,7 +387,7 @@ public abstract class State {
/**
* Makes an "offline" message.
- *
+ *
* @return a new message
*/
protected final Offline makeOffline() {
@@ -395,7 +396,7 @@ public abstract class State {
/**
* Makes a query message.
- *
+ *
* @return a new message
*/
protected final Query makeQuery() {