diff options
6 files changed, 70 insertions, 72 deletions
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java index 0dc8bf54..8ffccbef 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java @@ -23,8 +23,6 @@ package org.onap.policy.pdpx.main.comm; import java.util.Timer; import java.util.TimerTask; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; -import org.onap.policy.models.pdp.concepts.PdpStateChange; -import org.onap.policy.models.pdp.enums.PdpState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,11 +32,8 @@ public class XacmlPdpHearbeatPublisher extends TimerTask { private Timer timer; private XacmlPdpMessage heartbeatMessage; - private Object message; private static TopicSinkClient topicSinkClient; private static volatile boolean alive = false; - public static PdpState pdpState; - /** * Constructor for instantiating XacmlPdpPublisher. @@ -46,11 +41,9 @@ public class XacmlPdpHearbeatPublisher extends TimerTask { * @param message of the PDP * @param topicSinkClient used to send heartbeat message */ - public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, PdpStateChange message) { - this.message = message; - this.pdpState = message.getState(); + public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlPdpMessage message ) { this.topicSinkClient = topicSinkClient; - this.heartbeatMessage = new XacmlPdpMessage(); + this.heartbeatMessage = message; timer = new Timer(false); timer.scheduleAtFixedRate(this, 0, 60000); // time interval temp hard coded now but will be parameterized setAlive(true); @@ -58,7 +51,7 @@ public class XacmlPdpHearbeatPublisher extends TimerTask { @Override public void run() { - topicSinkClient.send(heartbeatMessage.formatHeartbeatMessage((PdpStateChange) message)); + topicSinkClient.send(heartbeatMessage.formatPdpStatusMessage()); LOGGER.info("Sending Xacml PDP heartbeat to the PAP"); } @@ -71,11 +64,6 @@ public class XacmlPdpHearbeatPublisher extends TimerTask { setAlive(false); } - public void updateInternalState(PdpState state) { - ((PdpStateChange) this.message).setState(state); - this.pdpState = state; - } - public static boolean isAlive() { return alive; } diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpMessage.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpMessage.java index 1253ff28..802d735a 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpMessage.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpMessage.java @@ -22,6 +22,7 @@ package org.onap.policy.pdpx.main.comm; +import lombok.Getter; import org.onap.policy.common.utils.network.NetworkUtil; import org.onap.policy.models.pdp.concepts.PdpStateChange; import org.onap.policy.models.pdp.concepts.PdpStatus; @@ -33,20 +34,25 @@ import org.onap.policy.pdpx.main.startstop.XacmlPdpActivator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Getter public class XacmlPdpMessage { // The logger for this class private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpMessage.class); + private String pdpGroup; + private String pdpSubGroup; + private PdpState pdpState; + private String pdpName = NetworkUtil.getHostname(); /** - * Method used to format the status message. + * Method used to format the initial registration status message. * * @param state of the PDP * @return status message of the PDP */ - public PdpStatus formatStatusMessage(PdpState state) { + public PdpStatus formatInitialStatusMessage(PdpState state) { PdpStatus status = new PdpStatus(); - status.setName(NetworkUtil.getHostname()); + status.setName(pdpName); if (XacmlPdpActivator.getCurrent().isAlive()) { status.setHealthy(PdpHealthStatus.HEALTHY); @@ -65,14 +71,13 @@ public class XacmlPdpMessage { } /** - * Method used to format the heartbeat status message. + * Method used to format the PdpStatus message for heartbeat and PDP Updates. * - * @param message PdpStateChange message received from the PAP * @return status message of the PDP */ - public PdpStatus formatHeartbeatMessage(PdpStateChange message) { + public PdpStatus formatPdpStatusMessage() { PdpStatus status = new PdpStatus(); - status.setName(NetworkUtil.getHostname()); + status.setName(pdpName); if (XacmlPdpActivator.getCurrent().isAlive()) { status.setHealthy(PdpHealthStatus.HEALTHY); @@ -81,37 +86,29 @@ public class XacmlPdpMessage { } status.setPdpType("xacml"); - status.setState(message.getState()); - status.setPdpGroup(message.getPdpGroup()); - status.setPdpSubgroup(message.getPdpSubgroup()); + status.setState(pdpState); + status.setPdpGroup(pdpGroup); + status.setPdpSubgroup(pdpSubGroup); status.setSupportedPolicyTypes(XacmlPdpApplicationManager.getToscaPolicyTypeIdents()); + status.setPolicies(XacmlPdpApplicationManager.getToscaPolicyIdentifiers()); return status; } /** - * Method used to format the PdpUpdate message. - * - * @param message PdpUpdate message that was received from the PAP - * @return status message of the PDP + * Method used to update PDP status attributes from PdpStateChange. */ - public PdpStatus formatPdpUpdateMessage(PdpUpdate message, PdpState state) { - PdpStatus status = new PdpStatus(); - status.setName(NetworkUtil.getHostname()); - - if (XacmlPdpActivator.getCurrent().isAlive()) { - status.setHealthy(PdpHealthStatus.HEALTHY); - } else { - status.setHealthy(PdpHealthStatus.NOT_HEALTHY); - } - - status.setPdpType("xacml"); - status.setState(state); - status.setPdpGroup(message.getPdpGroup()); - status.setPdpSubgroup(message.getPdpSubgroup()); - status.setSupportedPolicyTypes(XacmlPdpApplicationManager.getToscaPolicyTypeIdents()); - status.setPolicies(XacmlPdpApplicationManager.getToscaPolicyIdentifiers()); + public void updateInternalStatus(PdpStateChange message) { + pdpGroup = message.getPdpGroup(); + pdpSubGroup = message.getPdpSubgroup(); + pdpState = message.getState(); + } - return status; + /** + * Method used to update PDP status attributes from PdpUpdate. + */ + public void updateInternalStatus(PdpUpdate message) { + pdpGroup = message.getPdpGroup(); + pdpSubGroup = message.getPdpSubgroup(); } } diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpUpdatePublisher.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpUpdatePublisher.java index 4c9d0c21..54d9cf65 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpUpdatePublisher.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpUpdatePublisher.java @@ -44,7 +44,8 @@ public class XacmlPdpUpdatePublisher { * @param message Incoming message * @param client TopicSinkClient */ - public static void handlePdpUpdate(PdpUpdate message, TopicSinkClient client) { + public static void handlePdpUpdate(PdpUpdate message, TopicSinkClient client, + XacmlPdpMessage updatePdpMessage) { if (!message.getPolicies().isEmpty() || message.getPolicies() != null) { @@ -70,8 +71,8 @@ public class XacmlPdpUpdatePublisher { } } - XacmlPdpMessage updatePdpMessage = new XacmlPdpMessage(); - PdpStatus statusMessage = updatePdpMessage.formatPdpUpdateMessage(message, XacmlPdpHearbeatPublisher.pdpState); + updatePdpMessage.updateInternalStatus(message); + PdpStatus statusMessage = updatePdpMessage.formatPdpStatusMessage(); sendPdpUpdate(statusMessage, client); } diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpStateChangeListener.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpStateChangeListener.java index 84572d92..3102edb1 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpStateChangeListener.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpStateChangeListener.java @@ -40,40 +40,44 @@ public class XacmlPdpStateChangeListener extends ScoListener<PdpStateChange> { private TopicSinkClient client; private XacmlPdpHearbeatPublisher heartbeat; + private XacmlPdpMessage pdpInternalStatus; /** * Constructs the object. * * @param client used to send back response after receiving state change message */ - public XacmlPdpStateChangeListener(TopicSinkClient client) { + public XacmlPdpStateChangeListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) { super(PdpStateChange.class); PdpStateChange message = new PdpStateChange(); message.setState(PdpState.PASSIVE); - heartbeat = new XacmlPdpHearbeatPublisher(client, message); + this.pdpInternalStatus = pdpStatusMessage; this.client = client; + this.heartbeat = new XacmlPdpHearbeatPublisher(client, pdpStatusMessage); } @Override public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, PdpStateChange message) { - XacmlPdpMessage newMessage = new XacmlPdpMessage(); try { - PdpStatus newStatus = newMessage.formatStatusMessage(message.getState()); - // Send State Change Status to PAP - if (!client.send(newStatus)) { - LOGGER.error("failed to send to topic sink {}", client.getTopic()); - throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic()); - } + if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(), + pdpInternalStatus.getPdpSubGroup())) { - // Update the heartbeat internal state if publisher is running else create new publisher - if (XacmlPdpHearbeatPublisher.isAlive()) { - heartbeat.updateInternalState(message.getState()); - } else { - heartbeat = new XacmlPdpHearbeatPublisher(client, message); - } + pdpInternalStatus.updateInternalStatus(message); + PdpStatus newStatus = pdpInternalStatus.formatPdpStatusMessage(); + // Send State Change Status to PAP + if (!client.send(newStatus)) { + LOGGER.error("failed to send to topic sink {}", client.getTopic()); + throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic()); + } + + // Starte new heartbeat if publisher is NOT alive + if (!XacmlPdpHearbeatPublisher.isAlive()) { + heartbeat = new XacmlPdpHearbeatPublisher(client, pdpInternalStatus); + } + } } catch (final Exception e) { LOGGER.error("failed to handle the PDP State Change message.", e); } diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpUpdateListener.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpUpdateListener.java index 69f96a05..01d19160 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpUpdateListener.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpUpdateListener.java @@ -25,6 +25,7 @@ import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; import org.onap.policy.common.endpoints.listeners.ScoListener; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.onap.policy.models.pdp.concepts.PdpUpdate; +import org.onap.policy.pdpx.main.comm.XacmlPdpMessage; import org.onap.policy.pdpx.main.comm.XacmlPdpUpdatePublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,15 +35,17 @@ public class XacmlPdpUpdateListener extends ScoListener<PdpUpdate> { private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpStateChangeListener.class); private TopicSinkClient client; + private XacmlPdpMessage pdpInternalStatus; /** * Constructs the object. * * @param client used to send back response after receiving state change message */ - public XacmlPdpUpdateListener(TopicSinkClient client) { + public XacmlPdpUpdateListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) { super(PdpUpdate.class); this.client = client; + this.pdpInternalStatus = pdpStatusMessage; } @Override @@ -51,7 +54,12 @@ public class XacmlPdpUpdateListener extends ScoListener<PdpUpdate> { try { LOGGER.info("PDP update message has been received from the PAP - {}", message.toString()); - XacmlPdpUpdatePublisher.handlePdpUpdate(message, client); + + if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(), + pdpInternalStatus.getPdpSubGroup())) { + + XacmlPdpUpdatePublisher.handlePdpUpdate(message, client, pdpInternalStatus); + } } catch (final Exception e) { LOGGER.error("failed to handle the PDP Update message.", e); diff --git a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java index 70dd2c42..4db11d0f 100644 --- a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java +++ b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java @@ -104,13 +104,13 @@ public class XacmlPdpActivator extends ServiceManagerContainer { TopicEndpoint.manager.addTopicSources(topicProperties); try { - TopicSinkClient sinkClient = new TopicSinkClient(TOPIC); + final TopicSinkClient sinkClient = new TopicSinkClient(TOPIC); + this.message = new XacmlPdpMessage(); this.xacmlPdpParameterGroup = xacmlPdpParameterGroup; this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); - this.pdpStateChangeListener = new XacmlPdpStateChangeListener(sinkClient); - this.pdpUpdateListener = new XacmlPdpUpdateListener(sinkClient); + this.pdpStateChangeListener = new XacmlPdpStateChangeListener(sinkClient, message); + this.pdpUpdateListener = new XacmlPdpUpdateListener(sinkClient, message); this.register = new XacmlPdpPapRegistration(sinkClient); - this.message = new XacmlPdpMessage(); } catch (RuntimeException | TopicSinkClientException e) { throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e); } @@ -154,10 +154,10 @@ public class XacmlPdpActivator extends ServiceManagerContainer { addAction("Initial Registration with PAP", () -> { - register.pdpRegistration(message.formatStatusMessage(PdpState.PASSIVE)); + register.pdpRegistration(message.formatInitialStatusMessage(PdpState.PASSIVE)); }, () -> { - register.pdpRegistration(message.formatStatusMessage(PdpState.TERMINATED)); + register.pdpRegistration(message.formatInitialStatusMessage(PdpState.TERMINATED)); }); // @formatter:on |