diff options
Diffstat (limited to 'services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java')
-rw-r--r-- | services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java | 106 |
1 files changed, 63 insertions, 43 deletions
diff --git a/services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java b/services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java index 5058e7cd4..a6bd702aa 100644 --- a/services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java +++ b/services/services-onappf/src/main/java/org/onap/policy/apex/starter/ApexStarterActivator.java @@ -26,15 +26,17 @@ import java.util.Properties; import lombok.Getter; import lombok.Setter; +import org.onap.policy.apex.starter.comm.PdpStateChangeListener; +import org.onap.policy.apex.starter.comm.PdpStatusPublisher; +import org.onap.policy.apex.starter.comm.PdpUpdateListener; import org.onap.policy.apex.starter.exception.ApexStarterException; -import org.onap.policy.apex.starter.handler.CommunicationHandler; +import org.onap.policy.apex.starter.exception.ApexStarterRunTimeException; import org.onap.policy.apex.starter.handler.PdpMessageHandler; import org.onap.policy.apex.starter.parameters.ApexStarterParameterGroup; -import org.onap.policy.apex.starter.parameters.PdpStatusParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; -import org.onap.policy.common.parameters.ParameterService; +import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; import org.onap.policy.common.utils.services.Registry; import org.onap.policy.common.utils.services.ServiceManager; import org.onap.policy.common.utils.services.ServiceManagerException; @@ -50,7 +52,13 @@ public class ApexStarterActivator { private static final Logger LOGGER = LoggerFactory.getLogger(ApexStarterActivator.class); private final ApexStarterParameterGroup apexStarterParameterGroup; - private CommunicationHandler communicationHandler; + private List<TopicSink> topicSinks;// topics to which apex-pdp sends pdp status + private List<TopicSource> topicSources; // topics to which apex-pdp listens to for messages from pap. + private static final String[] MSG_TYPE_NAMES = { "messageName" }; + /** + * Listens for messages on the topic, decodes them into a message, and then dispatches them. + */ + private final MessageTypeDispatcher msgDispatcher; /** * Used to manage the services. @@ -64,14 +72,22 @@ public class ApexStarterActivator { public ApexStarterActivator(final ApexStarterParameterGroup apexStarterParameterGroup, final Properties topicProperties) { - final List<TopicSink> topicSinks = TopicEndpoint.manager.addTopicSinks(topicProperties); - final List<TopicSource> topicSources = TopicEndpoint.manager.addTopicSources(topicProperties); - this.apexStarterParameterGroup = apexStarterParameterGroup; + topicSinks = TopicEndpoint.manager.addTopicSinks(topicProperties); + topicSources = TopicEndpoint.manager.addTopicSources(topicProperties); // TODO: instanceId currently set as a random string, could be fetched from actual deployment final int random = (int) (Math.random() * 100); final String instanceId = "apex_" + random; + try { + this.apexStarterParameterGroup = apexStarterParameterGroup; + this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + } catch (final RuntimeException e) { + throw new ApexStarterRunTimeException(e); + } + + final PdpUpdateListener pdpUpdateListener = new PdpUpdateListener(); + final PdpStateChangeListener pdpStateChangeListener = new PdpStateChangeListener(); // @formatter:off this.manager = new ServiceManager() .addAction("topics", @@ -80,22 +96,44 @@ public class ApexStarterActivator { .addAction("set alive", () -> setAlive(true), () -> setAlive(false)) - .addAction("register context map", + .addAction("register pdp status context object", () -> Registry.register(ApexStarterConstants.REG_PDP_STATUS_OBJECT, new PdpMessageHandler().createPdpStatusFromParameters(instanceId, apexStarterParameterGroup.getPdpStatusParameters())), () -> Registry.unregister(ApexStarterConstants.REG_PDP_STATUS_OBJECT)) - .addAction("register parameters", - () -> registerToParameterService(apexStarterParameterGroup), - () -> deregisterToParameterService(apexStarterParameterGroup)) - .addAction("Communication handler", - () -> startCommunicationHandler(topicSinks, topicSources, - apexStarterParameterGroup.getPdpStatusParameters()), - () -> communicationHandler.stop()); + .addAction("topic sinks", + () -> Registry.register(ApexStarterConstants.REG_APEX_PDP_TOPIC_SINKS, topicSinks), + () -> Registry.unregister(ApexStarterConstants.REG_APEX_PDP_TOPIC_SINKS)) + .addAction("Pdp Status publisher", + () -> Registry.register(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER, + new PdpStatusPublisher(topicSinks, + apexStarterParameterGroup.getPdpStatusParameters().getTimeIntervalMs())), + () -> stopAndRemovePdpStatusPublisher()) + .addAction("Register pdp update listener", + () -> msgDispatcher.register("PDP_UPDATE", pdpUpdateListener), + () -> msgDispatcher.unregister("PDP_UPDATE")) + .addAction("Register pdp state change request dispatcher", + () -> msgDispatcher.register("PDP_STATE_CHANGE", pdpStateChangeListener), + () -> msgDispatcher.unregister("PDP_STATE_CHANGE")) + .addAction("Message Dispatcher", + () -> registerMsgDispatcher(), + () -> unregisterMsgDispatcher()); // @formatter:on } /** + * Method to stop and unregister the pdp status publisher. + */ + private void stopAndRemovePdpStatusPublisher() { + final PdpStatusPublisher pdpStatusPublisher = + Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER, PdpStatusPublisher.class); + // send a final heartbeat with terminated status + pdpStatusPublisher.send(new PdpMessageHandler().getTerminatedPdpStatus()); + pdpStatusPublisher.terminate(); + Registry.unregister(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER); + } + + /** * Initialize ApexStarter service. * * @throws ApexStarterException on errors in initializing the service @@ -124,9 +162,9 @@ public class ApexStarterActivator { if (!isAlive()) { throw new IllegalStateException("activator is not running"); } - try { manager.stop(); + Registry.unregister(ApexStarterConstants.REG_APEX_STARTER_ACTIVATOR); } catch (final ServiceManagerException exp) { LOGGER.error("ApexStarter termination failed"); throw new ApexStarterException(exp.getMessage(), exp); @@ -142,39 +180,21 @@ public class ApexStarterActivator { return apexStarterParameterGroup; } - - /** - * Method to register the parameters to Common Parameter Service. - * - * @param apexStarterParameterGroup the apex starter parameter group - */ - public void registerToParameterService(final ApexStarterParameterGroup apexStarterParameterGroup) { - ParameterService.register(apexStarterParameterGroup); - } - /** - * Method to deregister the parameters from Common Parameter Service. - * - * @param apexStarterParameterGroup the apex starter parameter group + * Registers the dispatcher with the topic source(s). */ - public void deregisterToParameterService(final ApexStarterParameterGroup apexStarterParameterGroup) { - ParameterService.deregister(apexStarterParameterGroup.getName()); + private void registerMsgDispatcher() { + for (final TopicSource source : topicSources) { + source.register(msgDispatcher); + } } /** - * Starts the communication handler which handles the communication between apex pdp and pap. - * - * @param pdpStatusParameters - * @param topicSources - * @param topicSinks - * - * @throws ApexStarterException if the handler start fails + * Unregisters the dispatcher from the topic source(s). */ - public void startCommunicationHandler(final List<TopicSink> topicSinks, final List<TopicSource> topicSources, - final PdpStatusParameters pdpStatusParameters) throws ApexStarterException { - communicationHandler = new CommunicationHandler(topicSinks, topicSources, pdpStatusParameters); - if (!communicationHandler.start()) { - throw new ApexStarterException("Failed to start the communication handler for ApexStarter"); + private void unregisterMsgDispatcher() { + for (final TopicSource source : topicSources) { + source.unregister(msgDispatcher); } } } |