aboutsummaryrefslogtreecommitdiffstats
path: root/services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java
diff options
context:
space:
mode:
Diffstat (limited to 'services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java')
-rw-r--r--services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java106
1 files changed, 63 insertions, 43 deletions
diff --git a/services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java b/services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java
index 5058e7cd4..a6bd702aa 100644
--- a/services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java
+++ b/services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java
@@ -26,15 +26,17 @@ import java.util.Properties;
import lombok.Getter;
import lombok.Setter;
+import org.onap.policy.apex.starter.comm.PdpStateChangeListener;
+import org.onap.policy.apex.starter.comm.PdpStatusPublisher;
+import org.onap.policy.apex.starter.comm.PdpUpdateListener;
import org.onap.policy.apex.starter.exception.ApexStarterException;
-import org.onap.policy.apex.starter.handler.CommunicationHandler;
+import org.onap.policy.apex.starter.exception.ApexStarterRunTimeException;
import org.onap.policy.apex.starter.handler.PdpMessageHandler;
import org.onap.policy.apex.starter.parameters.ApexStarterParameterGroup;
-import org.onap.policy.apex.starter.parameters.PdpStatusParameters;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.onap.policy.common.parameters.ParameterService;
+import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
import org.onap.policy.common.utils.services.Registry;
import org.onap.policy.common.utils.services.ServiceManager;
import org.onap.policy.common.utils.services.ServiceManagerException;
@@ -50,7 +52,13 @@ public class ApexStarterActivator {
private static final Logger LOGGER = LoggerFactory.getLogger(ApexStarterActivator.class);
private final ApexStarterParameterGroup apexStarterParameterGroup;
- private CommunicationHandler communicationHandler;
+ private List<TopicSink> topicSinks;// topics to which apex-pdp sends pdp status
+ private List<TopicSource> topicSources; // topics to which apex-pdp listens to for messages from pap.
+ private static final String[] MSG_TYPE_NAMES = { "messageName" };
+ /**
+ * Listens for messages on the topic, decodes them into a message, and then dispatches them.
+ */
+ private final MessageTypeDispatcher msgDispatcher;
/**
* Used to manage the services.
@@ -64,14 +72,22 @@ public class ApexStarterActivator {
public ApexStarterActivator(final ApexStarterParameterGroup apexStarterParameterGroup,
final Properties topicProperties) {
- final List<TopicSink> topicSinks = TopicEndpoint.manager.addTopicSinks(topicProperties);
- final List<TopicSource> topicSources = TopicEndpoint.manager.addTopicSources(topicProperties);
- this.apexStarterParameterGroup = apexStarterParameterGroup;
+ topicSinks = TopicEndpoint.manager.addTopicSinks(topicProperties);
+ topicSources = TopicEndpoint.manager.addTopicSources(topicProperties);
// TODO: instanceId currently set as a random string, could be fetched from actual deployment
final int random = (int) (Math.random() * 100);
final String instanceId = "apex_" + random;
+ try {
+ this.apexStarterParameterGroup = apexStarterParameterGroup;
+ this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ } catch (final RuntimeException e) {
+ throw new ApexStarterRunTimeException(e);
+ }
+
+ final PdpUpdateListener pdpUpdateListener = new PdpUpdateListener();
+ final PdpStateChangeListener pdpStateChangeListener = new PdpStateChangeListener();
// @formatter:off
this.manager = new ServiceManager()
.addAction("topics",
@@ -80,22 +96,44 @@ public class ApexStarterActivator {
.addAction("set alive",
() -> setAlive(true),
() -> setAlive(false))
- .addAction("register context map",
+ .addAction("register pdp status context object",
() -> Registry.register(ApexStarterConstants.REG_PDP_STATUS_OBJECT,
new PdpMessageHandler().createPdpStatusFromParameters(instanceId,
apexStarterParameterGroup.getPdpStatusParameters())),
() -> Registry.unregister(ApexStarterConstants.REG_PDP_STATUS_OBJECT))
- .addAction("register parameters",
- () -> registerToParameterService(apexStarterParameterGroup),
- () -> deregisterToParameterService(apexStarterParameterGroup))
- .addAction("Communication handler",
- () -> startCommunicationHandler(topicSinks, topicSources,
- apexStarterParameterGroup.getPdpStatusParameters()),
- () -> communicationHandler.stop());
+ .addAction("topic sinks",
+ () -> Registry.register(ApexStarterConstants.REG_APEX_PDP_TOPIC_SINKS, topicSinks),
+ () -> Registry.unregister(ApexStarterConstants.REG_APEX_PDP_TOPIC_SINKS))
+ .addAction("Pdp Status publisher",
+ () -> Registry.register(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER,
+ new PdpStatusPublisher(topicSinks,
+ apexStarterParameterGroup.getPdpStatusParameters().getTimeIntervalMs())),
+ () -> stopAndRemovePdpStatusPublisher())
+ .addAction("Register pdp update listener",
+ () -> msgDispatcher.register("PDP_UPDATE", pdpUpdateListener),
+ () -> msgDispatcher.unregister("PDP_UPDATE"))
+ .addAction("Register pdp state change request dispatcher",
+ () -> msgDispatcher.register("PDP_STATE_CHANGE", pdpStateChangeListener),
+ () -> msgDispatcher.unregister("PDP_STATE_CHANGE"))
+ .addAction("Message Dispatcher",
+ () -> registerMsgDispatcher(),
+ () -> unregisterMsgDispatcher());
// @formatter:on
}
/**
+ * Method to stop and unregister the pdp status publisher.
+ */
+ private void stopAndRemovePdpStatusPublisher() {
+ final PdpStatusPublisher pdpStatusPublisher =
+ Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER, PdpStatusPublisher.class);
+ // send a final heartbeat with terminated status
+ pdpStatusPublisher.send(new PdpMessageHandler().getTerminatedPdpStatus());
+ pdpStatusPublisher.terminate();
+ Registry.unregister(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER);
+ }
+
+ /**
* Initialize ApexStarter service.
*
* @throws ApexStarterException on errors in initializing the service
@@ -124,9 +162,9 @@ public class ApexStarterActivator {
if (!isAlive()) {
throw new IllegalStateException("activator is not running");
}
-
try {
manager.stop();
+ Registry.unregister(ApexStarterConstants.REG_APEX_STARTER_ACTIVATOR);
} catch (final ServiceManagerException exp) {
LOGGER.error("ApexStarter termination failed");
throw new ApexStarterException(exp.getMessage(), exp);
@@ -142,39 +180,21 @@ public class ApexStarterActivator {
return apexStarterParameterGroup;
}
-
- /**
- * Method to register the parameters to Common Parameter Service.
- *
- * @param apexStarterParameterGroup the apex starter parameter group
- */
- public void registerToParameterService(final ApexStarterParameterGroup apexStarterParameterGroup) {
- ParameterService.register(apexStarterParameterGroup);
- }
-
/**
- * Method to deregister the parameters from Common Parameter Service.
- *
- * @param apexStarterParameterGroup the apex starter parameter group
+ * Registers the dispatcher with the topic source(s).
*/
- public void deregisterToParameterService(final ApexStarterParameterGroup apexStarterParameterGroup) {
- ParameterService.deregister(apexStarterParameterGroup.getName());
+ private void registerMsgDispatcher() {
+ for (final TopicSource source : topicSources) {
+ source.register(msgDispatcher);
+ }
}
/**
- * Starts the communication handler which handles the communication between apex pdp and pap.
- *
- * @param pdpStatusParameters
- * @param topicSources
- * @param topicSinks
- *
- * @throws ApexStarterException if the handler start fails
+ * Unregisters the dispatcher from the topic source(s).
*/
- public void startCommunicationHandler(final List<TopicSink> topicSinks, final List<TopicSource> topicSources,
- final PdpStatusParameters pdpStatusParameters) throws ApexStarterException {
- communicationHandler = new CommunicationHandler(topicSinks, topicSources, pdpStatusParameters);
- if (!communicationHandler.start()) {
- throw new ApexStarterException("Failed to start the communication handler for ApexStarter");
+ private void unregisterMsgDispatcher() {
+ for (final TopicSource source : topicSources) {
+ source.unregister(msgDispatcher);
}
}
}