aboutsummaryrefslogtreecommitdiffstats
path: root/tosca-controlloop/participant/participant-intermediary/src/main/java
diff options
context:
space:
mode:
authorSirisha_Manchikanti <sirisha.manchikanti@est.tech>2021-03-24 13:06:20 +0000
committerSirisha_Manchikanti <sirisha.manchikanti@est.tech>2021-03-30 11:57:46 +0100
commitbd80043a42f83392992bc31730b4a2c17475468a (patch)
treeaee3cd833691603d364454003516b8a166094705 /tosca-controlloop/participant/participant-intermediary/src/main/java
parent88d90cdaa6b60316d2c8872c4d2e5723ee4c8512 (diff)
Participant and controlloop event handling
This commit includes handling for ControlLoopUpdate and ParticipantStatus events, respective handling in ControlLoopHandler and ParticipantHandler is covered. ParticipantStateChange, ControlLoopStateChange and ParticipantHealthCheck events are not covered in this commit. Underlying participant logic (to handle Policy-participant APIs, DCAE-participant APIs etc) will be taken care in upcoming commits. Issue-ID: POLICY-2987 Signed-off-by: Sirisha_Manchikanti <sirisha.manchikanti@est.tech> Change-Id: Iba18c565218d2304ec57ce5bc47ad8765386db95
Diffstat (limited to 'tosca-controlloop/participant/participant-intermediary/src/main/java')
-rw-r--r--tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java32
-rw-r--r--tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java37
-rw-r--r--tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java61
-rw-r--r--tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java118
-rw-r--r--tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java62
-rw-r--r--tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java282
-rw-r--r--tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java129
-rw-r--r--tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java204
8 files changed, 909 insertions, 16 deletions
diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java
index 6c3e029fe..7ea8b4f35 100644
--- a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java
+++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java
@@ -30,6 +30,7 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -42,12 +43,12 @@ public interface ParticipantIntermediaryApi {
*
* @param parameters the parameters for the intermediary
*/
- public void init(ParticipantIntermediaryParameters parameters);
+ void init(ParticipantIntermediaryParameters parameters);
/**
* Close the intermediary.
*/
- public void close();
+ void close();
/**
* Get participants loops from the intermediary API.
@@ -56,23 +57,23 @@ public interface ParticipantIntermediaryApi {
* @param version the participant version, null for all
* @return the participants
*/
- public List<Participant> getParticipants(String name, String version);
+ List<Participant> getParticipants(String name, String version);
/**
* Update the state of a participant.
*
* @param definition the definition of the participant to update the state on
* @param state the state of the participant
- * @return updated participant
+ * @return the participant
*/
- public Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state);
+ Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state);
/**
* Update the statistics of a participant.
*
* @param participantStatistics the statistics of the participant
*/
- public void updateParticipantStatistics(ParticipantStatistics participantStatistics);
+ void updateParticipantStatistics(ParticipantStatistics participantStatistics);
/**
* Get control loops from the intermediary API.
@@ -81,7 +82,7 @@ public interface ParticipantIntermediaryApi {
* @param version the control loop element version, null for all
* @return the control loop elements
*/
- public ControlLoops getControlLoops(String name, String version);
+ ControlLoops getControlLoops(String name, String version);
/**
* Get control loop elements from the intermediary API.
@@ -90,7 +91,7 @@ public interface ParticipantIntermediaryApi {
* @param version the control loop element version, null for all
* @return the control loop elements
*/
- public List<ControlLoopElement> getControlLoopElements(String name, String version);
+ List<ControlLoopElement> getControlLoopElements(String name, String version);
/**
* Update the state of a control loop.
@@ -99,7 +100,7 @@ public interface ParticipantIntermediaryApi {
* @param state the state of the control loop
* @return ControlLoop updated control loop
*/
- public ControlLoop updateControlLoopState(ToscaConceptIdentifier definition, ControlLoopOrderedState state);
+ ControlLoop updateControlLoopState(ToscaConceptIdentifier definition, ControlLoopOrderedState state);
/**
* Update the state of a control loop element.
@@ -108,12 +109,21 @@ public interface ParticipantIntermediaryApi {
* @param state the state of the control loop element
* @return ControlLoopElement updated control loop element
*/
- public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState state);
+ ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState state);
/**
* Update the control loop element statistics.
*
* @param elementStatistics the updated statistics
*/
- public void updateControlLoopElementStatistics(ClElementStatistics elementStatistics);
+ void updateControlLoopElementStatistics(ClElementStatistics elementStatistics);
+
+ /**
+ * Returns participantHandler, This will not be used in real world, but for junits,
+ * if participantHandler is not returned, there is no way to test state change messages
+ * without dmaap simulator.
+ *
+ * @return ParticipantHandler returns a participantHandler
+ */
+ ParticipantHandler getParticipantHandler();
}
diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java
index 939f927e6..9e494862e 100644
--- a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java
+++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java
@@ -32,6 +32,8 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState;
import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics;
import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.IntermediaryActivator;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -40,49 +42,74 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
*/
public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryApi {
+ // The activator for the participant intermediary
+ private IntermediaryActivator activator;
+
@Override
public void init(ParticipantIntermediaryParameters parameters) {
+ activator = new IntermediaryActivator(parameters);
+
+ activator.start();
}
@Override
public void close() {
+ activator.shutdown();
}
@Override
public List<Participant> getParticipants(String name, String version) {
- return Collections.emptyList();
+ return List.of(activator.getParticipantHandler().getParticipant(name, version));
}
@Override
public Participant updateParticipantState(ToscaConceptIdentifier definition, ParticipantState state) {
- return null;
+ return activator.getParticipantHandler().updateParticipantState(definition, state);
}
@Override
public void updateParticipantStatistics(ParticipantStatistics participantStatistics) {
+ // TODO Auto-generated method stub
}
@Override
public ControlLoops getControlLoops(String name, String version) {
- return null;
+ return activator.getParticipantHandler().getControlLoopHandler().getControlLoops();
}
@Override
public List<ControlLoopElement> getControlLoopElements(String name, String version) {
+ List<ControlLoop> controlLoops = activator.getParticipantHandler()
+ .getControlLoopHandler().getControlLoops().getControlLoopList();
+
+ for (ControlLoop controlLoop : controlLoops) {
+ if (controlLoop.getDefinition().getName().equals(name)) {
+ return controlLoop.getElements();
+ }
+ }
return Collections.emptyList();
}
@Override
public ControlLoop updateControlLoopState(ToscaConceptIdentifier definition, ControlLoopOrderedState state) {
- return null;
+ return activator.getParticipantHandler().getControlLoopHandler()
+ .updateControlLoopState(definition, state);
}
@Override
public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState state) {
- return null;
+ return activator.getParticipantHandler().getControlLoopHandler()
+ .updateControlLoopElementState(id, state);
}
@Override
public void updateControlLoopElementStatistics(ClElementStatistics elementStatistics) {
+ activator.getParticipantHandler().getControlLoopHandler()
+ .updateControlLoopElementStatistics(elementStatistics);
+ }
+
+ @Override
+ public ParticipantHandler getParticipantHandler() {
+ return activator.getParticipantHandler();
}
}
diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
new file mode 100644
index 000000000..2ba98891f
--- /dev/null
+++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ControlLoopUpdateListener.java
@@ -0,0 +1,61 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+
+import java.io.Closeable;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listener for Control Loop Update messages sent by CLAMP.
+ */
+public class ControlLoopUpdateListener extends ScoListener<ParticipantControlLoopUpdate> implements Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopUpdateListener.class);
+
+ private final ParticipantHandler participantHandler;
+
+ /**
+ * Constructs the object.
+ *
+ * @param participantHandler the handler for managing the state of the participant
+ */
+ public ControlLoopUpdateListener(final ParticipantHandler participantHandler) {
+ super(ParticipantControlLoopUpdate.class);
+ this.participantHandler = participantHandler;
+ }
+
+ @Override
+ public void onTopicEvent(final CommInfrastructure infra, final String topic, final StandardCoderObject sco,
+ final ParticipantControlLoopUpdate participantControlLoopUpdateMsg) {
+ LOGGER.debug("Control Loop update received from CLAMP - {}", participantControlLoopUpdateMsg);
+ participantHandler.getControlLoopHandler().handleControlLoopUpdate(participantControlLoopUpdateMsg);
+ }
+
+ @Override
+ public void close() {
+ // No explicit action on this class
+ }
+}
diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java
new file mode 100644
index 000000000..20490f81d
--- /dev/null
+++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java
@@ -0,0 +1,118 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+
+import java.io.Closeable;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class sends messages from participants to CLAMP.
+ */
+public class MessageSender extends TimerTask implements Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class);
+
+ private final ParticipantHandler participantHandler;
+ private final ParticipantStatusPublisher publisher;
+ private ScheduledExecutorService timerPool;
+
+ /**
+ * Constructor, set the publisher.
+ *
+ * @param participantHandler the participant handler to use for gathering information
+ * @param publisher the publisher to use for sending messages
+ * @param interval time interval to send Participant Status periodic messages
+ */
+ public MessageSender(ParticipantHandler participantHandler, ParticipantStatusPublisher publisher,
+ long interval) {
+ this.participantHandler = participantHandler;
+ this.publisher = publisher;
+
+ // Kick off the timer
+ timerPool = makeTimerPool();
+ timerPool.scheduleAtFixedRate(this, 0, interval, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void run() {
+ LOGGER.debug("Sent heartbeat to CLAMP");
+
+ ParticipantResponseDetails response = new ParticipantResponseDetails();
+
+ response.setResponseTo(null);
+ response.setResponseStatus(ParticipantResponseStatus.PERIODIC);
+ response.setResponseMessage("Periodic response from participant");
+ }
+
+ @Override
+ public void close() {
+ timerPool.shutdown();
+ }
+
+ /**
+ * Send a response message for this participant.
+ *
+ * @param response the details to include in the response message
+ */
+ public void sendResponse(ParticipantResponseDetails response) {
+ sendResponse(null, response);
+ }
+
+ /**
+ * Send a response message for this participant.
+ *
+ * @param controlLoopId the control loop to which this message is a response
+ * @param response the details to include in the response message
+ */
+ public void sendResponse(ToscaConceptIdentifier controlLoopId, ParticipantResponseDetails response) {
+ ParticipantStatus status = new ParticipantStatus();
+
+ // Participant related fields
+ status.setParticipantId(participantHandler.getParticipantId());
+ status.setState(participantHandler.getState());
+ status.setHealthStatus(participantHandler.getHealthStatus());
+
+ // Control loop related fields
+ status.setControlLoopId(controlLoopId);
+ status.setControlLoops(participantHandler.getControlLoopHandler().getControlLoops());
+ status.setResponse(response);
+
+ publisher.send(status);
+ }
+
+ /**
+ * Makes a new timer pool.
+ *
+ * @return a new timer pool
+ */
+ protected ScheduledExecutorService makeTimerPool() {
+ return Executors.newScheduledThreadPool(1);
+ }
+}
diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java
new file mode 100644
index 000000000..e909327cd
--- /dev/null
+++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.comm;
+
+import java.io.Closeable;
+import java.util.List;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to send Participant Status messages to clamp using TopicSinkClient.
+ */
+public class ParticipantStatusPublisher implements Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusPublisher.class);
+
+ private final TopicSinkClient topicSinkClient;
+
+ /**
+ * Constructor for instantiating ParticipantStatusPublisher.
+ *
+ * @param topicSinks the topic sinks
+ */
+ public ParticipantStatusPublisher(List<TopicSink> topicSinks) {
+ this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ }
+
+ /**
+ * Method to send Participant Status message to clamp on demand.
+ *
+ * @param participantStatus the Participant Status
+ */
+ public void send(final ParticipantStatus participantStatus) {
+ topicSinkClient.send(participantStatus);
+ LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus);
+ }
+
+ @Override
+ public void close() {
+ // No explicit action on this class
+ }
+}
diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
new file mode 100644
index 000000000..f27be961b
--- /dev/null
+++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ControlLoopHandler.java
@@ -0,0 +1,282 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.collections4.CollectionUtils;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ClElementStatistics;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus;
+import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender;
+import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * This class is responsible for managing the state of all control loops in the participant.
+ */
+public class ControlLoopHandler implements Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControlLoopHandler.class);
+
+ private ToscaConceptIdentifier participantId = null;
+ private MessageSender sender = null;
+
+ private final Map<ToscaConceptIdentifier, ControlLoop> controlLoopMap = new LinkedHashMap<>();
+ private final Map<UUID, ControlLoopElement> elementsOnThisParticipant = new LinkedHashMap<>();
+
+ public ControlLoopHandler() {
+ }
+
+ /**
+ * Constructor, set the participant ID and sender.
+ *
+ * @param parameters the parameters of the participant
+ * @param sender the sender for sending responses to messages
+ */
+ public ControlLoopHandler(ParticipantIntermediaryParameters parameters, MessageSender sender) {
+ this.participantId = parameters.getParticipantId();
+ this.sender = sender;
+ }
+
+ @Override
+ public void close() {
+ // No explicit action on this class
+ }
+
+ /**
+ * Handle a control loop element state change message.
+ *
+ * @param id controlloop element id
+ * @param state the updated state
+ * @return controlLoopElement the updated controlloop element
+ */
+ public ControlLoopElement updateControlLoopElementState(UUID id, ControlLoopOrderedState state) {
+
+ if (id == null) {
+ return null;
+ }
+
+ ControlLoopElement clElement = elementsOnThisParticipant.get(id);
+ if (clElement != null) {
+ clElement.setOrderedState(state);
+ LOGGER.debug("Control loop element {} ordered state changed to {}", id, state);
+ ParticipantResponseDetails response = new ParticipantResponseDetails();
+ sender.sendResponse(response);
+ return elementsOnThisParticipant.get(id);
+ }
+
+ return null;
+ }
+
+ public void updateControlLoopElementStatistics(ClElementStatistics elementStatistics) {
+ // TODO Handle statistics coming from a participant implementation
+ }
+
+ /**
+ * Handle a control loop state change message.
+ *
+ * @param definition controlloop id
+ * @param state the updated state
+ * @return controlLoop the updated controlloop
+ */
+ public ControlLoop updateControlLoopState(ToscaConceptIdentifier definition, ControlLoopOrderedState state) {
+ if (definition == null) {
+ return null;
+ }
+
+ ControlLoop controlLoop = controlLoopMap.get(definition);
+ if (controlLoop == null) {
+ LOGGER.debug("Control loop {} does not use this participant", definition.getName());
+ return null;
+ }
+
+ ParticipantResponseDetails response = new ParticipantResponseDetails();
+ handleState(controlLoop, response, state);
+ sender.sendResponse(response);
+ return controlLoop;
+ }
+
+ /**
+ * Handle a control loop state change message.
+ *
+ * @param stateChangeMsg the state change message
+ */
+ public void handleControlLoopStateChange(ParticipantControlLoopStateChange stateChangeMsg) {
+ if (stateChangeMsg.getControlLoopId() == null) {
+ return;
+ }
+
+ ControlLoop controlLoop = controlLoopMap.get(stateChangeMsg.getControlLoopId());
+
+ if (controlLoop == null) {
+ LOGGER.debug("Control loop {} does not use this participant", stateChangeMsg.getControlLoopId());
+ return;
+ }
+
+ ParticipantResponseDetails response = new ParticipantResponseDetails(stateChangeMsg);
+ handleState(controlLoop, response, stateChangeMsg.getOrderedState());
+ sender.sendResponse(response);
+ }
+
+ /**
+ * Method to handle state changes.
+ *
+ * @param controlLoop participant response
+ * @param response participant response
+ * @param state controlloop ordered state
+ */
+ private void handleState(final ControlLoop controlLoop, final ParticipantResponseDetails response,
+ ControlLoopOrderedState state) {
+ switch (state) {
+ case UNINITIALISED:
+ handleUninitialisedState(controlLoop, response);
+ break;
+ case PASSIVE:
+ handlePassiveState(controlLoop, response);
+ break;
+ case RUNNING:
+ handleRunningState(controlLoop, response);
+ break;
+ default:
+ LOGGER.debug("StateChange message has no state, state is null {}", controlLoop.getDefinition());
+ break;
+ }
+ }
+
+ /**
+ * Handle a control loop update message.
+ *
+ * @param updateMsg the update message
+ */
+ public void handleControlLoopUpdate(ParticipantControlLoopUpdate updateMsg) {
+ if (!updateMsg.appliesTo(participantId)) {
+ return;
+ }
+
+ ControlLoop controlLoop = controlLoopMap.get(updateMsg.getControlLoopId());
+
+ ParticipantResponseDetails response = new ParticipantResponseDetails(updateMsg);
+
+ // TODO: Updates to existing ControlLoops are not supported yet (Addition/Removal of ControlLoop
+ // elements to existing ControlLoop has to be supported).
+ if (controlLoop != null) {
+ response.setResponseStatus(ParticipantResponseStatus.FAIL);
+ response.setResponseMessage("Control loop " + updateMsg.getControlLoopId()
+ + " already defined on participant " + participantId);
+
+ sender.sendResponse(response);
+ return;
+ }
+
+ controlLoop = updateMsg.getControlLoop();
+ controlLoop.getElements().removeIf(element -> participantId.equals(element.getParticipantId()));
+
+ controlLoopMap.put(updateMsg.getControlLoopId(), controlLoop);
+ for (ControlLoopElement element : updateMsg.getControlLoop().getElements()) {
+ element.setState(element.getOrderedState().asState());
+ elementsOnThisParticipant.put(element.getId(), element);
+ }
+
+ response.setResponseStatus(ParticipantResponseStatus.SUCCESS);
+ response.setResponseMessage(
+ "Control loop " + updateMsg.getControlLoopId() + " defined on participant " + participantId);
+
+ sender.sendResponse(response);
+ }
+
+ /**
+ * Method to handle when the new state from participant is UNINITIALISED state.
+ *
+ * @param controlLoop participant response
+ * @param response participant response
+ */
+ private void handleUninitialisedState(final ControlLoop controlLoop, final ParticipantResponseDetails response) {
+ handleStateChange(controlLoop, ControlLoopState.UNINITIALISED, response);
+ controlLoopMap.remove(controlLoop.getKey().asIdentifier());
+ }
+
+ /**
+ * Method to handle when the new state from participant is PASSIVE state.
+ *
+ * @param controlLoop participant response
+ * @param response participant response
+ */
+ private void handlePassiveState(final ControlLoop controlLoop, final ParticipantResponseDetails response) {
+ handleStateChange(controlLoop, ControlLoopState.PASSIVE, response);
+ }
+
+ /**
+ * Method to handle when the new state from participant is RUNNING state.
+ *
+ * @param controlLoop participant response
+ * @param response participant response
+ */
+ private void handleRunningState(final ControlLoop controlLoop, final ParticipantResponseDetails response) {
+ handleStateChange(controlLoop, ControlLoopState.RUNNING, response);
+ }
+
+ /**
+ * Method to update the state of control loop elements.
+ *
+ * @param controlLoop participant status in memory
+ * @param state new state of the control loop elements
+ */
+ private void handleStateChange(ControlLoop controlLoop, ControlLoopState newState,
+ ParticipantResponseDetails response) {
+
+ if (newState.equals(controlLoop.getState())) {
+ response.setResponseStatus(ParticipantResponseStatus.SUCCESS);
+ response.setResponseMessage("Control loop is already in state " + newState);
+ return;
+ }
+
+ if (!CollectionUtils.isEmpty(controlLoop.getElements())) {
+ controlLoop.getElements().forEach(element -> element.setState(newState));
+ }
+
+ response.setResponseStatus(ParticipantResponseStatus.SUCCESS);
+ response.setResponseMessage("ControlLoop state changed from " + controlLoop.getState() + " to " + newState);
+ controlLoop.setState(newState);
+ }
+
+ /**
+ * Get control loops as a {@link ConrolLoops} class.
+ *
+ * @return the control loops
+ */
+ public ControlLoops getControlLoops() {
+ ControlLoops controlLoops = new ControlLoops();
+ controlLoops.setControlLoopList(new ArrayList<>(controlLoopMap.values()));
+ return controlLoops;
+ }
+}
diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
new file mode 100644
index 000000000..dd0cf30a8
--- /dev/null
+++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java
@@ -0,0 +1,129 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.ws.rs.core.Response.Status;
+import lombok.Getter;
+import lombok.experimental.Delegate;
+import org.onap.policy.clamp.controlloop.common.exception.ControlLoopRuntimeException;
+import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener;
+import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher;
+import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
+
+/**
+ * This class activates the Participant Intermediary together with all its handlers.
+ */
+public class IntermediaryActivator extends ServiceManagerContainer {
+ // Name of the message type for messages on topics
+ private static final String[] MSG_TYPE_NAMES = {"messageType"};
+
+ @Getter
+ private final ParticipantIntermediaryParameters parameters;
+
+ // Topics from which the participant receives and to which the participant sends messages
+ private List<TopicSink> topicSinks;
+ private List<TopicSource> topicSources;
+
+ // The participant handler for this intermediary
+ final AtomicReference<ParticipantHandler> participantHandler = new AtomicReference<>();
+
+ /**
+ * Listens for messages on the topic, decodes them into a message, and then dispatches them.
+ */
+ private final MessageTypeDispatcher msgDispatcher;
+
+ /**
+ * Instantiate the activator for participant.
+ *
+ * @param parameters the parameters for the participant intermediary
+ */
+ public IntermediaryActivator(final ParticipantIntermediaryParameters parameters) {
+ this.parameters = parameters;
+
+ topicSinks =
+ TopicEndpointManager.getManager().addTopicSinks(parameters.getClampControlLoopTopics().getTopicSinks());
+
+ topicSources = TopicEndpointManager.getManager()
+ .addTopicSources(parameters.getClampControlLoopTopics().getTopicSources());
+
+ try {
+ this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ } catch (final RuntimeException e) {
+ throw new ControlLoopRuntimeException(Status.INTERNAL_SERVER_ERROR,
+ "topic message dispatcher failed to start", e);
+ }
+
+ // @formatter:off
+ final AtomicReference<ParticipantStatusPublisher> statusPublisher = new AtomicReference<>();
+ final AtomicReference<ControlLoopUpdateListener> controlLoopUpdateListener = new AtomicReference<>();
+
+ addAction("Topic endpoint management",
+ () -> TopicEndpointManager.getManager().start(),
+ () -> TopicEndpointManager.getManager().shutdown());
+
+ addAction("Participant Status Publisher",
+ () -> statusPublisher.set(new ParticipantStatusPublisher(topicSinks)),
+ () -> statusPublisher.get().close());
+
+ addAction("Participant Handler",
+ () -> participantHandler.set(new ParticipantHandler(parameters, statusPublisher.get())),
+ () -> participantHandler.get().close());
+
+ addAction("Control Loop Update Listener",
+ () -> controlLoopUpdateListener.set(new ControlLoopUpdateListener(participantHandler.get())),
+ () -> controlLoopUpdateListener.get().close());
+
+ addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
+ // @formatter:on
+ }
+
+ /**
+ * Registers the dispatcher with the topic source(s).
+ */
+ private void registerMsgDispatcher() {
+ for (final TopicSource source : topicSources) {
+ source.register(msgDispatcher);
+ }
+ }
+
+ /**
+ * Unregisters the dispatcher from the topic source(s).
+ */
+ private void unregisterMsgDispatcher() {
+ for (final TopicSource source : topicSources) {
+ source.unregister(msgDispatcher);
+ }
+ }
+
+ /**
+ * Return the participant handler.
+ */
+ public ParticipantHandler getParticipantHandler() {
+ return participantHandler.get();
+ }
+}
diff --git a/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
new file mode 100644
index 000000000..1150471ae
--- /dev/null
+++ b/tosca-controlloop/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java
@@ -0,0 +1,204 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.controlloop.participant.intermediary.handler;
+
+import java.io.Closeable;
+import java.util.Objects;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus;
+import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus;
+import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange;
+import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender;
+import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher;
+import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantIntermediaryParameters;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is responsible for managing the state of a participant.
+ */
+@Getter
+public class ParticipantHandler implements Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantHandler.class);
+
+ private final ToscaConceptIdentifier participantId;
+ private final MessageSender sender;
+ private final ControlLoopHandler controlLoopHandler;
+
+ @Setter
+ private ParticipantState state = ParticipantState.UNKNOWN;
+
+ @Setter
+ private ParticipantHealthStatus healthStatus = ParticipantHealthStatus.UNKNOWN;
+
+ /**
+ * Constructor, set the participant ID and sender.
+ *
+ * @param parameters the parameters of the participant
+ * @param publisher the publisher for sending responses to messages
+ */
+ public ParticipantHandler(ParticipantIntermediaryParameters parameters, ParticipantStatusPublisher publisher) {
+ this.participantId = parameters.getParticipantId();
+ this.sender = new MessageSender(this, publisher, parameters.getReportingTimeInterval());
+ this.controlLoopHandler = new ControlLoopHandler(parameters, sender);
+ }
+
+ @Override
+ public void close() {
+ sender.close();
+ controlLoopHandler.close();
+ }
+
+ /**
+ * Method which handles a participant state change event from clamp.
+ *
+ * @param stateChangeMsg participant state change message
+ */
+ public void handleParticipantStateChange(final ParticipantStateChange stateChangeMsg) {
+
+ if (!stateChangeMsg.appliesTo(participantId)) {
+ return;
+ }
+
+ ParticipantResponseDetails response = new ParticipantResponseDetails(stateChangeMsg);
+
+ switch (stateChangeMsg.getState()) {
+ case PASSIVE:
+ handlePassiveState(response);
+ break;
+ case ACTIVE:
+ handleActiveState(response);
+ break;
+ case SAFE:
+ handleSafeState(response);
+ break;
+ case TEST:
+ handleTestState(response);
+ break;
+ case TERMINATED:
+ handleTerminatedState(response);
+ break;
+ default:
+ LOGGER.debug("StateChange message has no state, state is null {}", stateChangeMsg.getParticipantId());
+ response.setResponseStatus(ParticipantResponseStatus.FAIL);
+ response.setResponseMessage("StateChange message has invalid state for participantId "
+ + stateChangeMsg.getParticipantId());
+ break;
+ }
+
+ sender.sendResponse(response);
+ }
+
+ /**
+ * Method to handle when the new state from participant is active.
+ *
+ * @param response participant response
+ */
+ private void handleActiveState(final ParticipantResponseDetails response) {
+ handleStateChange(ParticipantState.ACTIVE, response);
+ }
+
+ /**
+ * Method to handle when the new state from participant is passive.
+ *
+ * @param response participant response
+ */
+ private void handlePassiveState(final ParticipantResponseDetails response) {
+ handleStateChange(ParticipantState.PASSIVE, response);
+ }
+
+ /**
+ * Method to handle when the new state from participant is safe.
+ *
+ * @param response participant response
+ */
+ private void handleSafeState(final ParticipantResponseDetails response) {
+ handleStateChange(ParticipantState.SAFE, response);
+ }
+
+ /**
+ * Method to handle when the new state from participant is TEST.
+ *
+ * @param response participant response
+ */
+ private void handleTestState(final ParticipantResponseDetails response) {
+ handleStateChange(ParticipantState.TEST, response);
+ }
+
+ /**
+ * Method to handle when the new state from participant is Terminated.
+ *
+ * @param response participant response
+ */
+ private void handleTerminatedState(final ParticipantResponseDetails response) {
+ handleStateChange(ParticipantState.TERMINATED, response);
+ }
+
+ private void handleStateChange(ParticipantState newParticipantState, ParticipantResponseDetails response) {
+ if (state.equals(newParticipantState)) {
+ response.setResponseStatus(ParticipantResponseStatus.SUCCESS);
+ response.setResponseMessage("Participant already in state " + newParticipantState);
+ } else {
+ response.setResponseStatus(ParticipantResponseStatus.SUCCESS);
+ response.setResponseMessage("Participant state changed from " + state + " to " + newParticipantState);
+ state = newParticipantState;
+ }
+ }
+
+ /**
+ * Method to update participant state.
+ *
+ * @param definition participant definition
+ * @param participantState participant state
+ */
+ public Participant updateParticipantState(ToscaConceptIdentifier definition,
+ ParticipantState participantState) {
+ if (!Objects.equals(definition, participantId)) {
+ LOGGER.debug("No participant with this ID {}", definition.getName());
+ return null;
+ }
+ ParticipantResponseDetails response = new ParticipantResponseDetails();
+ handleStateChange(participantState, response);
+ sender.sendResponse(response);
+ return getParticipant(definition.getName(), definition.getVersion());
+ }
+
+ /**
+ * Get participants as a {@link Participant} class.
+ *
+ * @return the participant
+ */
+ public Participant getParticipant(String name, String version) {
+ if (participantId.getName().equals(name)) {
+ Participant participant = new Participant();
+ participant.setDefinition(participantId);
+ participant.setParticipantState(state);
+ participant.setHealthStatus(healthStatus);
+ return participant;
+ }
+ return null;
+ }
+}