summaryrefslogtreecommitdiffstats
path: root/participant/participant-intermediary/src
diff options
context:
space:
mode:
Diffstat (limited to 'participant/participant-intermediary/src')
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java21
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java53
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java36
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java34
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java36
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java57
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java34
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java8
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java55
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java8
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java139
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java46
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantParameters.java (renamed from participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java)18
13 files changed, 250 insertions, 295 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java
index adc9c2393..a87299bdc 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java
@@ -31,25 +31,12 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics;
-import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
/**
* This interface is used by participant implementations to use the participant intermediary.
*/
public interface ParticipantIntermediaryApi {
- /**
- * Initialise the participant intermediary.
- *
- * @param parameters the parameters for the intermediary
- */
- void init(ParticipantIntermediaryParameters parameters);
-
- /**
- * Close the intermediary.
- */
- void close();
/**
* Register a listener for control loop elements that are mediated by the intermediary.
@@ -128,12 +115,4 @@ public interface ParticipantIntermediaryApi {
*/
void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics);
- /**
- * Return participantHandler, This will not be used in real world, but for junits,
- * if participantHandler is not returned, there is no way to test state change messages
- * without dmaap simulator.
- *
- * @return the participant handler
- */
- ParticipantHandler getParticipantHandler();
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java
index 839088d72..838f47544 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java
@@ -35,45 +35,41 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics;
import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener;
import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi;
-import org.onap.policy.clamp.controlloop.participant.intermediary.handler.IntermediaryActivator;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.springframework.stereotype.Component;
/**
* This class is api implementation used by participant intermediary.
*/
+@Component
public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryApi {
- // The activator for the participant intermediary
- private IntermediaryActivator activator;
+ // The handler for the participant intermediary
+ private ParticipantHandler participantHandler;
- @Override
- public void init(ParticipantIntermediaryParameters parameters) {
- activator = new IntermediaryActivator(parameters);
-
- activator.start();
- }
-
- @Override
- public void close() {
- activator.shutdown();
+ /**
+ * Constructor.
+ *
+ * @param participantHandler ParticipantHandler
+ */
+ public ParticipantIntermediaryApiImpl(ParticipantHandler participantHandler) {
+ this.participantHandler = participantHandler;
}
@Override
public void registerControlLoopElementListener(ControlLoopElementListener controlLoopElementListener) {
- activator.getParticipantHandler().getControlLoopHandler()
- .registerControlLoopElementListener(controlLoopElementListener);
+ participantHandler.getControlLoopHandler().registerControlLoopElementListener(controlLoopElementListener);
}
@Override
public List<Participant> getParticipants(String name, String version) {
- return List.of(activator.getParticipantHandler().getParticipant(name, version));
+ return List.of(participantHandler.getParticipant(name, version));
}
@Override
public Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state) {
- return activator.getParticipantHandler().updateParticipantState(definition, state);
+ return participantHandler.updateParticipantState(definition, state);
}
@Override
@@ -83,13 +79,13 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp
@Override
public ControlLoops getControlLoops(String name, String version) {
- return activator.getParticipantHandler().getControlLoopHandler().getControlLoops();
+ return participantHandler.getControlLoopHandler().getControlLoops();
}
@Override
public Map<UUID, ControlLoopElement> getControlLoopElements(String name, String version) {
- List<ControlLoop> controlLoops = activator.getParticipantHandler()
- .getControlLoopHandler().getControlLoops().getControlLoopList();
+ List<ControlLoop> controlLoops =
+ participantHandler.getControlLoopHandler().getControlLoops().getControlLoopList();
for (ControlLoop controlLoop : controlLoops) {
if (name.equals(controlLoop.getDefinition().getName())) {
@@ -101,8 +97,8 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp
@Override
public ControlLoopElement getControlLoopElement(UUID id) {
- List<ControlLoop> controlLoops = activator.getParticipantHandler()
- .getControlLoopHandler().getControlLoops().getControlLoopList();
+ List<ControlLoop> controlLoops =
+ participantHandler.getControlLoopHandler().getControlLoops().getControlLoopList();
for (ControlLoop controlLoop : controlLoops) {
ControlLoopElement clElement = controlLoop.getElements().get(id);
@@ -116,18 +112,11 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp
@Override
public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState currentState,
ControlLoopState newState) {
- return activator.getParticipantHandler().getControlLoopHandler()
- .updateControlLoopElementState(id, currentState, newState);
+ return participantHandler.getControlLoopHandler().updateControlLoopElementState(id, currentState, newState);
}
@Override
public void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics) {
- activator.getParticipantHandler().getControlLoopHandler()
- .updateControlLoopElementStatistics(id, elementStatistics);
- }
-
- @Override
- public ParticipantHandler getParticipantHandler() {
- return activator.getParticipantHandler();
+ participantHandler.getControlLoopHandler().updateControlLoopElementStatistics(id, elementStatistics);
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java
index 50b8b9cdc..e46c6db1b 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopStateChangeListener.java
@@ -20,23 +20,15 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
-import java.io.Closeable;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.listeners.ScoListener;
-import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/**
* Listener for Participant State Change messages sent by CLAMP.
*/
-public class ControlLoopStateChangeListener extends ScoListener<ParticipantControlLoopStateChange>
- implements Closeable {
- private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopStateChangeListener.class);
-
- private final ParticipantHandler participantHandler;
+@Component
+public class ControlLoopStateChangeListener extends ParticipantListener<ParticipantControlLoopStateChange> {
/**
* Constructs the object.
@@ -44,25 +36,7 @@ public class ControlLoopStateChangeListener extends ScoListener<ParticipantContr
* @param participantHandler the handler for managing the state of the participant
*/
public ControlLoopStateChangeListener(final ParticipantHandler participantHandler) {
- super(ParticipantControlLoopStateChange.class);
- this.participantHandler = participantHandler;
- }
-
- @Override
- public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
- final ParticipantControlLoopStateChange controlLoopStateChangeMsg) {
- LOGGER.debug("Control Loop State Change received from CLAMP - {}", controlLoopStateChangeMsg);
-
- if (participantHandler.canHandle(controlLoopStateChangeMsg)) {
- LOGGER.debug("Message for this participant");
- participantHandler.getControlLoopHandler().handleControlLoopStateChange(controlLoopStateChangeMsg);
- } else {
- LOGGER.debug("Message not for this participant");
- }
- }
-
- @Override
- public void close() {
- // No explicit action on this class
+ super(ParticipantControlLoopStateChange.class, participantHandler,
+ participantHandler::handleControlLoopStateChange);
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
index ab2437c1c..d15643e0f 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
@@ -20,22 +20,15 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
-import java.io.Closeable;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.listeners.ScoListener;
-import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/**
* Listener for Control Loop Update messages sent by CLAMP.
*/
-public class ControlLoopUpdateListener extends ScoListener<ParticipantControlLoopUpdate> implements Closeable {
- private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopUpdateListener.class);
-
- private final ParticipantHandler participantHandler;
+@Component
+public class ControlLoopUpdateListener extends ParticipantListener<ParticipantControlLoopUpdate> {
/**
* Constructs the object.
@@ -43,25 +36,6 @@ public class ControlLoopUpdateListener extends ScoListener<ParticipantControlLoo
* @param participantHandler the handler for managing the state of the participant
*/
public ControlLoopUpdateListener(final ParticipantHandler participantHandler) {
- super(ParticipantControlLoopUpdate.class);
- this.participantHandler = participantHandler;
- }
-
- @Override
- public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
- final ParticipantControlLoopUpdate participantControlLoopUpdateMsg) {
- LOGGER.debug("Control Loop update received from CLAMP - {}", participantControlLoopUpdateMsg);
-
- if (participantHandler.canHandle(participantControlLoopUpdateMsg)) {
- LOGGER.debug("Message for this participant");
- participantHandler.getControlLoopHandler().handleControlLoopUpdate(participantControlLoopUpdateMsg);
- } else {
- LOGGER.debug("Message not for this participant");
- }
- }
-
- @Override
- public void close() {
- // No explicit action on this class
+ super(ParticipantControlLoopUpdate.class, participantHandler, participantHandler::handleControlLoopUpdate);
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java
index e0e6be329..15f5140eb 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantHealthCheckListener.java
@@ -20,22 +20,15 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
-import java.io.Closeable;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.listeners.ScoListener;
-import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/**
* Listener for Participant health status messages sent by CLAMP.
*/
-public class ParticipantHealthCheckListener extends ScoListener<ParticipantHealthCheck> implements Closeable {
- private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHealthCheckListener.class);
-
- private final ParticipantHandler participantHandler;
+@Component
+public class ParticipantHealthCheckListener extends ParticipantListener<ParticipantHealthCheck> {
/**
* Constructs the object.
@@ -43,27 +36,6 @@ public class ParticipantHealthCheckListener extends ScoListener<ParticipantHealt
* @param participantHandler the handler for managing the state and health of the participant
*/
public ParticipantHealthCheckListener(final ParticipantHandler participantHandler) {
- super(ParticipantHealthCheck.class);
- this.participantHandler = participantHandler;
- }
-
- @Override
- public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
- final ParticipantHealthCheck participantHealthCheckMsg) {
- LOGGER.debug("Participant Health Check message received from CLAMP - {}", participantHealthCheckMsg);
-
-
- if (participantHandler.canHandle(participantHealthCheckMsg)) {
- LOGGER.debug("Message for this participant");
- participantHandler.handleParticipantHealthCheck(participantHealthCheckMsg);
- } else {
- LOGGER.debug("Message not for this participant");
- }
-
- }
-
- @Override
- public void close() {
- // No explicit action on this class
+ super(ParticipantHealthCheck.class, participantHandler, participantHandler::handleParticipantHealthCheck);
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java
new file mode 100644
index 000000000..c6ad900b3
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantListener.java
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+
+import java.util.function.Consumer;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+
+/**
+ * Abstract Listener for Participant messages sent by CLAMP.
+ */
+public abstract class ParticipantListener<T extends ParticipantMessage> extends ScoListener<T> {
+
+ private final ParticipantHandler participantHandler;
+ private final Consumer<T> consumer;
+
+ /**
+ * Constructs the object.
+ *
+ * @param clazz class of message this handles
+ * @param participantHandler ParticipantHandler
+ * @param consumer function that handles the message
+ */
+ protected ParticipantListener(Class<T> clazz, ParticipantHandler participantHandler, Consumer<T> consumer) {
+ super(clazz);
+ this.participantHandler = participantHandler;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) {
+ if (participantHandler.appliesTo(message)) {
+ consumer.accept(message);
+ }
+ }
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java
index c1a8b5b4a..ec6548a7c 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStateChangeListener.java
@@ -20,23 +20,16 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
-import java.io.Closeable;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange;
import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.listeners.ScoListener;
-import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/**
* Listener for Participant State Change messages sent by CLAMP.
*
*/
-public class ParticipantStateChangeListener extends ScoListener<ParticipantStateChange> implements Closeable {
- private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStateChangeListener.class);
-
- private final ParticipantHandler participantHandler;
+@Component
+public class ParticipantStateChangeListener extends ParticipantListener<ParticipantStateChange> {
/**
* Constructs the object.
@@ -44,25 +37,6 @@ public class ParticipantStateChangeListener extends ScoListener<ParticipantState
* @param participantHandler the handler for managing the state of the participant
*/
public ParticipantStateChangeListener(final ParticipantHandler participantHandler) {
- super(ParticipantStateChange.class);
- this.participantHandler = participantHandler;
- }
-
- @Override
- public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
- final ParticipantStateChange participantStateChangeMsg) {
- LOGGER.debug("Participant State Change received from CLAMP - {}", participantStateChangeMsg);
-
- if (participantHandler.canHandle(participantStateChangeMsg)) {
- LOGGER.debug("Message for this participant");
- participantHandler.handleParticipantStateChange(participantStateChangeMsg);
- } else {
- LOGGER.debug("Message not for this participant");
- }
- }
-
- @Override
- public void close() {
- // No explicit action on this class
+ super(ParticipantStateChange.class, participantHandler, participantHandler::handleParticipantStateChange);
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java
index bc53b4e9d..78b998453 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java
@@ -20,7 +20,6 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
-import java.io.Closeable;
import java.util.List;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
@@ -32,7 +31,7 @@ import org.slf4j.LoggerFactory;
* This class is used to send Participant Status messages to clamp using TopicSinkClient.
*
*/
-public class ParticipantStatusPublisher implements Closeable {
+public class ParticipantStatusPublisher {
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusPublisher.class);
private final TopicSinkClient topicSinkClient;
@@ -55,9 +54,4 @@ public class ParticipantStatusPublisher implements Closeable {
topicSinkClient.send(participantStatus);
LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus);
}
-
- @Override
- public void close() {
- // No explicit action on this class
- }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java
new file mode 100644
index 000000000..dc7d87eec
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java
@@ -0,0 +1,55 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.config;
+
+import java.util.List;
+import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher;
+import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class BeanFactory {
+
+ // Name of the message type for messages on topics
+ private static final String[] MSG_TYPE_NAMES = {"messageType"};
+
+ /**
+ * create ParticipantStatusPublisher.
+ *
+ * @param parameters the ParticipantParameters
+ * @return ParticipantStatusPublisher
+ */
+ @Bean
+ public ParticipantStatusPublisher publisher(final ParticipantParameters parameters) {
+ List<TopicSink> topicSinks = TopicEndpointManager.getManager()
+ .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks());
+ return new ParticipantStatusPublisher(topicSinks);
+ }
+
+ @Bean
+ public MessageTypeDispatcher msgDispatcher() {
+ return new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ }
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
index 3eebd177f..50048ffc2 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
@@ -20,7 +20,6 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
-import java.io.Closeable;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -51,7 +50,7 @@ import org.slf4j.LoggerFactory;
* This class is responsible for managing the state of all control loops in the participant.
*/
@NoArgsConstructor
-public class ControlLoopHandler implements Closeable {
+public class ControlLoopHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopHandler.class);
private ToscaConceptIdentifier participantType = null;
@@ -77,11 +76,6 @@ public class ControlLoopHandler implements Closeable {
this.messageSender = messageSender;
}
- @Override
- public void close() {
- // No explicit action on this class
- }
-
public void registerControlLoopElementListener(ControlLoopElementListener listener) {
listeners.add(listener);
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
index 3eae27267..2d789d40d 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
@@ -20,125 +20,100 @@
package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.ws.rs.core.Response.Status;
-import lombok.Getter;
-import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType;
-import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange;
import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener;
import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener;
import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantHealthCheckListener;
import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStateChangeListener;
-import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher;
-import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
+import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
-import org.onap.policy.common.endpoints.listeners.ScoListener;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
/**
* This class activates the Participant Intermediary together with all its handlers.
*/
-public class IntermediaryActivator extends ServiceManagerContainer {
- // Name of the message type for messages on topics
- private static final String[] MSG_TYPE_NAMES = {"messageType"};
+@Component
+public class IntermediaryActivator extends ServiceManagerContainer implements Closeable {
- @Getter
- private final ParticipantIntermediaryParameters parameters;
+ private final ApplicationContext applicationContext;
// Topics from which the participant receives and to which the participant sends messages
- private List<TopicSink> topicSinks;
private List<TopicSource> topicSources;
- // The participant handler for this intermediary
- final AtomicReference<ParticipantHandler> participantHandler = new AtomicReference<>();
-
- /**
- * Listens for messages on the topic, decodes them into a message, and then dispatches them.
- */
- private final MessageTypeDispatcher msgDispatcher;
-
/**
* Instantiate the activator for participant.
*
- * @param parameters the parameters for the participant intermediary
- * @throws ControlLoopRuntimeException when the activation fails
+ * @param applicationContext ApplicationContext
+ * @param parameters the ParticipantParameters
*/
- public IntermediaryActivator(final ParticipantIntermediaryParameters parameters) {
- this.parameters = parameters;
-
- topicSinks =
- TopicEndpointManager.getManager().addTopicSinks(parameters.getClampControlLoopTopics().getTopicSinks());
+ public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters) {
+ this.applicationContext = applicationContext;
- topicSources =
- TopicEndpointManager.getManager().addTopicSources(parameters.getClampControlLoopTopics().getTopicSources());
-
- try {
- this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
- } catch (final RuntimeException e) {
- throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR,
- "topic message dispatcher failed to start", e);
- }
+ topicSources = TopicEndpointManager.getManager()
+ .addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources());
// @formatter:off
- final AtomicReference<ParticipantStatusPublisher> statusPublisher = new AtomicReference<>();
- final AtomicReference<ParticipantStateChangeListener> participantStateChangeListener = new AtomicReference<>();
- final AtomicReference<ParticipantHealthCheckListener> participantHealthCheckListener = new AtomicReference<>();
- final AtomicReference<ControlLoopStateChangeListener> controlLoopStateChangeListener = new AtomicReference<>();
- final AtomicReference<ControlLoopUpdateListener> controlLoopUpdateListener = new AtomicReference<>();
addAction("Topic endpoint management",
() -> TopicEndpointManager.getManager().start(),
() -> TopicEndpointManager.getManager().shutdown());
- addAction("Participant Status Publisher",
- () -> statusPublisher.set(new ParticipantStatusPublisher(topicSinks)),
- () -> statusPublisher.get().close());
-
- addAction("Participant Handler",
- () -> participantHandler.set(new ParticipantHandler(parameters, statusPublisher.get())),
- () -> participantHandler.get().close());
-
- addAction("Participant State Change Listener",
- () -> participantStateChangeListener.set(new ParticipantStateChangeListener(participantHandler.get())),
- () -> participantStateChangeListener.get().close());
-
- addAction("Participant Health Check Listener",
- () -> participantHealthCheckListener.set(new ParticipantHealthCheckListener(participantHandler.get())),
- () -> participantHealthCheckListener.get().close());
-
- addAction("Control Loop State Change Listener",
- () -> controlLoopStateChangeListener.set(new ControlLoopStateChangeListener(participantHandler.get())),
- () -> controlLoopStateChangeListener.get().close());
-
- addAction("Control Loop Update Listener",
- () -> controlLoopUpdateListener.set(new ControlLoopUpdateListener(participantHandler.get())),
- () -> controlLoopUpdateListener.get().close());
-
addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
// @formatter:on
}
/**
+ * Handle ContextRefreshEvent.
+ *
+ * @param ctxRefreshedEvent ContextRefreshedEvent
+ */
+ @EventListener
+ public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) {
+ if (!isAlive()) {
+ start();
+ }
+ }
+
+ /**
+ * Handle ContextClosedEvent.
+ *
+ * @param ctxClosedEvent ContextClosedEvent
+ */
+ @EventListener
+ public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
+ if (isAlive()) {
+ stop();
+ }
+ }
+
+ /**
* Registers the dispatcher with the topic source(s).
*/
private void registerMsgDispatcher() {
+ MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class);
+
msgDispatcher.register(ParticipantMessageType.PARTICIPANT_STATE_CHANGE.name(),
- (ScoListener<ParticipantStateChange>) new ParticipantStateChangeListener(participantHandler.get()));
+ applicationContext.getBean(ParticipantStateChangeListener.class));
+
msgDispatcher.register(ParticipantMessageType.PARTICIPANT_HEALTH_CHECK.name(),
- (ScoListener<ParticipantHealthCheck>) new ParticipantHealthCheckListener(participantHandler.get()));
+ applicationContext.getBean(ParticipantHealthCheckListener.class));
+
msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_STATE_CHANGE.name(),
- (ScoListener<ParticipantControlLoopStateChange>) new ControlLoopStateChangeListener(
- participantHandler.get()));
+ applicationContext.getBean(ControlLoopStateChangeListener.class));
+
msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_UPDATE.name(),
- (ScoListener<ParticipantControlLoopUpdate>) new ControlLoopUpdateListener(participantHandler.get()));
+ applicationContext.getBean(ControlLoopUpdateListener.class));
+
for (final TopicSource source : topicSources) {
source.register(msgDispatcher);
}
@@ -148,17 +123,15 @@ public class IntermediaryActivator extends ServiceManagerContainer {
* Unregisters the dispatcher from the topic source(s).
*/
private void unregisterMsgDispatcher() {
+ MessageTypeDispatcher msgDispatcher = applicationContext.getBean(MessageTypeDispatcher.class);
+
for (final TopicSource source : topicSources) {
source.unregister(msgDispatcher);
}
}
- /**
- * Return the participant handler.
- *
- * @return the participant handler
- */
- public ParticipantHandler getParticipantHandler() {
- return participantHandler.get();
+ @Override
+ public void close() throws IOException {
+ super.shutdown();
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
index 5e414b175..1c54658fa 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
@@ -28,6 +28,8 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage;
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails;
@@ -35,15 +37,17 @@ import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.Parti
import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange;
import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender;
import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher;
-import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
+import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/**
* This class is responsible for managing the state of a participant.
*/
@Getter
+@Component
public class ParticipantHandler implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHandler.class);
@@ -65,18 +69,18 @@ public class ParticipantHandler implements Closeable {
* @param parameters the parameters of the participant
* @param publisher the publisher for sending responses to messages
*/
- public ParticipantHandler(ParticipantIntermediaryParameters parameters, ParticipantStatusPublisher publisher) {
- this.participantType = parameters.getParticipantType();
- this.participantId = parameters.getParticipantId();
- this.sender = new MessageSender(this, publisher, parameters.getReportingTimeInterval());
- this.controlLoopHandler = new ControlLoopHandler(parameters, sender);
+ public ParticipantHandler(ParticipantParameters parameters, ParticipantStatusPublisher publisher) {
+ this.participantType = parameters.getIntermediaryParameters().getParticipantType();
+ this.participantId = parameters.getIntermediaryParameters().getParticipantId();
+ this.sender =
+ new MessageSender(this, publisher, parameters.getIntermediaryParameters().getReportingTimeInterval());
+ this.controlLoopHandler = new ControlLoopHandler(parameters.getIntermediaryParameters(), sender);
this.participantStatistics = new ParticipantStatistics();
}
@Override
public void close() {
sender.close();
- controlLoopHandler.close();
}
/**
@@ -133,6 +137,24 @@ public class ParticipantHandler implements Closeable {
}
/**
+ * Handle a control loop update message.
+ *
+ * @param updateMsg the update message
+ */
+ public void handleControlLoopUpdate(ParticipantControlLoopUpdate updateMsg) {
+ controlLoopHandler.handleControlLoopUpdate(updateMsg);
+ }
+
+ /**
+ * Handle a control loop state change message.
+ *
+ * @param stateChangeMsg the state change message
+ */
+ public void handleControlLoopStateChange(ParticipantControlLoopStateChange stateChangeMsg) {
+ controlLoopHandler.handleControlLoopStateChange(stateChangeMsg);
+ }
+
+ /**
* Method to handle when the new state from participant is active.
*
* @param response participant response
@@ -233,4 +255,14 @@ public class ParticipantHandler implements Closeable {
public boolean canHandle(ParticipantMessage partipantMsg) {
return partipantMsg.appliesTo(participantType, participantId);
}
+
+ /**
+ * Check if a participant message applies to this participant handler.
+ *
+ * @param partipantMsg the message to check
+ * @return true if it applies, false otherwise
+ */
+ public boolean appliesTo(ParticipantMessage partipantMsg) {
+ return partipantMsg.appliesTo(participantType, participantId);
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantParameters.java
index d7cc4b2ed..c350b1b95 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryFactory.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/parameters/ParticipantParameters.java
@@ -18,21 +18,9 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.clamp.controlloop.participant.intermediary.api;
+package org.onap.policy.clamp.controlloop.participant.intermediary.parameters;
-import org.onap.policy.clamp.controlloop.participant.intermediary.api.impl.ParticipantIntermediaryApiImpl;
+public interface ParticipantParameters {
-/**
- * Factory class for creating {@link ParticipantIntermediaryApi} instances.
- */
-public class ParticipantIntermediaryFactory {
-
- /**
- * Create an implementation of the {@link ParticipantIntermediaryApi} interface.
- *
- * @return the implementation of the API
- */
- public ParticipantIntermediaryApi createApiImplementation() {
- return new ParticipantIntermediaryApiImpl();
- }
+ ParticipantIntermediaryParameters getIntermediaryParameters();
}