summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java18
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpMessage.java59
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpUpdatePublisher.java7
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpStateChangeListener.java34
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpUpdateListener.java12
-rw-r--r--main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java12
6 files changed, 70 insertions, 72 deletions
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
index 0dc8bf54..8ffccbef 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
@@ -23,8 +23,6 @@ package org.onap.policy.pdpx.main.comm;
import java.util.Timer;
import java.util.TimerTask;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.onap.policy.models.pdp.concepts.PdpStateChange;
-import org.onap.policy.models.pdp.enums.PdpState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,11 +32,8 @@ public class XacmlPdpHearbeatPublisher extends TimerTask {
private Timer timer;
private XacmlPdpMessage heartbeatMessage;
- private Object message;
private static TopicSinkClient topicSinkClient;
private static volatile boolean alive = false;
- public static PdpState pdpState;
-
/**
* Constructor for instantiating XacmlPdpPublisher.
@@ -46,11 +41,9 @@ public class XacmlPdpHearbeatPublisher extends TimerTask {
* @param message of the PDP
* @param topicSinkClient used to send heartbeat message
*/
- public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, PdpStateChange message) {
- this.message = message;
- this.pdpState = message.getState();
+ public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlPdpMessage message ) {
this.topicSinkClient = topicSinkClient;
- this.heartbeatMessage = new XacmlPdpMessage();
+ this.heartbeatMessage = message;
timer = new Timer(false);
timer.scheduleAtFixedRate(this, 0, 60000); // time interval temp hard coded now but will be parameterized
setAlive(true);
@@ -58,7 +51,7 @@ public class XacmlPdpHearbeatPublisher extends TimerTask {
@Override
public void run() {
- topicSinkClient.send(heartbeatMessage.formatHeartbeatMessage((PdpStateChange) message));
+ topicSinkClient.send(heartbeatMessage.formatPdpStatusMessage());
LOGGER.info("Sending Xacml PDP heartbeat to the PAP");
}
@@ -71,11 +64,6 @@ public class XacmlPdpHearbeatPublisher extends TimerTask {
setAlive(false);
}
- public void updateInternalState(PdpState state) {
- ((PdpStateChange) this.message).setState(state);
- this.pdpState = state;
- }
-
public static boolean isAlive() {
return alive;
}
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpMessage.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpMessage.java
index 1253ff28..802d735a 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpMessage.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpMessage.java
@@ -22,6 +22,7 @@
package org.onap.policy.pdpx.main.comm;
+import lombok.Getter;
import org.onap.policy.common.utils.network.NetworkUtil;
import org.onap.policy.models.pdp.concepts.PdpStateChange;
import org.onap.policy.models.pdp.concepts.PdpStatus;
@@ -33,20 +34,25 @@ import org.onap.policy.pdpx.main.startstop.XacmlPdpActivator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Getter
public class XacmlPdpMessage {
// The logger for this class
private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpMessage.class);
+ private String pdpGroup;
+ private String pdpSubGroup;
+ private PdpState pdpState;
+ private String pdpName = NetworkUtil.getHostname();
/**
- * Method used to format the status message.
+ * Method used to format the initial registration status message.
*
* @param state of the PDP
* @return status message of the PDP
*/
- public PdpStatus formatStatusMessage(PdpState state) {
+ public PdpStatus formatInitialStatusMessage(PdpState state) {
PdpStatus status = new PdpStatus();
- status.setName(NetworkUtil.getHostname());
+ status.setName(pdpName);
if (XacmlPdpActivator.getCurrent().isAlive()) {
status.setHealthy(PdpHealthStatus.HEALTHY);
@@ -65,14 +71,13 @@ public class XacmlPdpMessage {
}
/**
- * Method used to format the heartbeat status message.
+ * Method used to format the PdpStatus message for heartbeat and PDP Updates.
*
- * @param message PdpStateChange message received from the PAP
* @return status message of the PDP
*/
- public PdpStatus formatHeartbeatMessage(PdpStateChange message) {
+ public PdpStatus formatPdpStatusMessage() {
PdpStatus status = new PdpStatus();
- status.setName(NetworkUtil.getHostname());
+ status.setName(pdpName);
if (XacmlPdpActivator.getCurrent().isAlive()) {
status.setHealthy(PdpHealthStatus.HEALTHY);
@@ -81,37 +86,29 @@ public class XacmlPdpMessage {
}
status.setPdpType("xacml");
- status.setState(message.getState());
- status.setPdpGroup(message.getPdpGroup());
- status.setPdpSubgroup(message.getPdpSubgroup());
+ status.setState(pdpState);
+ status.setPdpGroup(pdpGroup);
+ status.setPdpSubgroup(pdpSubGroup);
status.setSupportedPolicyTypes(XacmlPdpApplicationManager.getToscaPolicyTypeIdents());
+ status.setPolicies(XacmlPdpApplicationManager.getToscaPolicyIdentifiers());
return status;
}
/**
- * Method used to format the PdpUpdate message.
- *
- * @param message PdpUpdate message that was received from the PAP
- * @return status message of the PDP
+ * Method used to update PDP status attributes from PdpStateChange.
*/
- public PdpStatus formatPdpUpdateMessage(PdpUpdate message, PdpState state) {
- PdpStatus status = new PdpStatus();
- status.setName(NetworkUtil.getHostname());
-
- if (XacmlPdpActivator.getCurrent().isAlive()) {
- status.setHealthy(PdpHealthStatus.HEALTHY);
- } else {
- status.setHealthy(PdpHealthStatus.NOT_HEALTHY);
- }
-
- status.setPdpType("xacml");
- status.setState(state);
- status.setPdpGroup(message.getPdpGroup());
- status.setPdpSubgroup(message.getPdpSubgroup());
- status.setSupportedPolicyTypes(XacmlPdpApplicationManager.getToscaPolicyTypeIdents());
- status.setPolicies(XacmlPdpApplicationManager.getToscaPolicyIdentifiers());
+ public void updateInternalStatus(PdpStateChange message) {
+ pdpGroup = message.getPdpGroup();
+ pdpSubGroup = message.getPdpSubgroup();
+ pdpState = message.getState();
+ }
- return status;
+ /**
+ * Method used to update PDP status attributes from PdpUpdate.
+ */
+ public void updateInternalStatus(PdpUpdate message) {
+ pdpGroup = message.getPdpGroup();
+ pdpSubGroup = message.getPdpSubgroup();
}
}
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpUpdatePublisher.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpUpdatePublisher.java
index 4c9d0c21..54d9cf65 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpUpdatePublisher.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpUpdatePublisher.java
@@ -44,7 +44,8 @@ public class XacmlPdpUpdatePublisher {
* @param message Incoming message
* @param client TopicSinkClient
*/
- public static void handlePdpUpdate(PdpUpdate message, TopicSinkClient client) {
+ public static void handlePdpUpdate(PdpUpdate message, TopicSinkClient client,
+ XacmlPdpMessage updatePdpMessage) {
if (!message.getPolicies().isEmpty() || message.getPolicies() != null) {
@@ -70,8 +71,8 @@ public class XacmlPdpUpdatePublisher {
}
}
- XacmlPdpMessage updatePdpMessage = new XacmlPdpMessage();
- PdpStatus statusMessage = updatePdpMessage.formatPdpUpdateMessage(message, XacmlPdpHearbeatPublisher.pdpState);
+ updatePdpMessage.updateInternalStatus(message);
+ PdpStatus statusMessage = updatePdpMessage.formatPdpStatusMessage();
sendPdpUpdate(statusMessage, client);
}
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpStateChangeListener.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpStateChangeListener.java
index 84572d92..3102edb1 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpStateChangeListener.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpStateChangeListener.java
@@ -40,40 +40,44 @@ public class XacmlPdpStateChangeListener extends ScoListener<PdpStateChange> {
private TopicSinkClient client;
private XacmlPdpHearbeatPublisher heartbeat;
+ private XacmlPdpMessage pdpInternalStatus;
/**
* Constructs the object.
*
* @param client used to send back response after receiving state change message
*/
- public XacmlPdpStateChangeListener(TopicSinkClient client) {
+ public XacmlPdpStateChangeListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) {
super(PdpStateChange.class);
PdpStateChange message = new PdpStateChange();
message.setState(PdpState.PASSIVE);
- heartbeat = new XacmlPdpHearbeatPublisher(client, message);
+ this.pdpInternalStatus = pdpStatusMessage;
this.client = client;
+ this.heartbeat = new XacmlPdpHearbeatPublisher(client, pdpStatusMessage);
}
@Override
public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, PdpStateChange message) {
- XacmlPdpMessage newMessage = new XacmlPdpMessage();
try {
- PdpStatus newStatus = newMessage.formatStatusMessage(message.getState());
- // Send State Change Status to PAP
- if (!client.send(newStatus)) {
- LOGGER.error("failed to send to topic sink {}", client.getTopic());
- throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
- }
+ if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(),
+ pdpInternalStatus.getPdpSubGroup())) {
- // Update the heartbeat internal state if publisher is running else create new publisher
- if (XacmlPdpHearbeatPublisher.isAlive()) {
- heartbeat.updateInternalState(message.getState());
- } else {
- heartbeat = new XacmlPdpHearbeatPublisher(client, message);
- }
+ pdpInternalStatus.updateInternalStatus(message);
+ PdpStatus newStatus = pdpInternalStatus.formatPdpStatusMessage();
+ // Send State Change Status to PAP
+ if (!client.send(newStatus)) {
+ LOGGER.error("failed to send to topic sink {}", client.getTopic());
+ throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
+ }
+
+ // Starte new heartbeat if publisher is NOT alive
+ if (!XacmlPdpHearbeatPublisher.isAlive()) {
+ heartbeat = new XacmlPdpHearbeatPublisher(client, pdpInternalStatus);
+ }
+ }
} catch (final Exception e) {
LOGGER.error("failed to handle the PDP State Change message.", e);
}
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpUpdateListener.java b/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpUpdateListener.java
index 69f96a05..01d19160 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpUpdateListener.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpUpdateListener.java
@@ -25,6 +25,7 @@ import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
import org.onap.policy.common.endpoints.listeners.ScoListener;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.pdpx.main.comm.XacmlPdpMessage;
import org.onap.policy.pdpx.main.comm.XacmlPdpUpdatePublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,15 +35,17 @@ public class XacmlPdpUpdateListener extends ScoListener<PdpUpdate> {
private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpStateChangeListener.class);
private TopicSinkClient client;
+ private XacmlPdpMessage pdpInternalStatus;
/**
* Constructs the object.
*
* @param client used to send back response after receiving state change message
*/
- public XacmlPdpUpdateListener(TopicSinkClient client) {
+ public XacmlPdpUpdateListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) {
super(PdpUpdate.class);
this.client = client;
+ this.pdpInternalStatus = pdpStatusMessage;
}
@Override
@@ -51,7 +54,12 @@ public class XacmlPdpUpdateListener extends ScoListener<PdpUpdate> {
try {
LOGGER.info("PDP update message has been received from the PAP - {}", message.toString());
- XacmlPdpUpdatePublisher.handlePdpUpdate(message, client);
+
+ if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(),
+ pdpInternalStatus.getPdpSubGroup())) {
+
+ XacmlPdpUpdatePublisher.handlePdpUpdate(message, client, pdpInternalStatus);
+ }
} catch (final Exception e) {
LOGGER.error("failed to handle the PDP Update message.", e);
diff --git a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
index 70dd2c42..4db11d0f 100644
--- a/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
+++ b/main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java
@@ -104,13 +104,13 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
TopicEndpoint.manager.addTopicSources(topicProperties);
try {
- TopicSinkClient sinkClient = new TopicSinkClient(TOPIC);
+ final TopicSinkClient sinkClient = new TopicSinkClient(TOPIC);
+ this.message = new XacmlPdpMessage();
this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
- this.pdpStateChangeListener = new XacmlPdpStateChangeListener(sinkClient);
- this.pdpUpdateListener = new XacmlPdpUpdateListener(sinkClient);
+ this.pdpStateChangeListener = new XacmlPdpStateChangeListener(sinkClient, message);
+ this.pdpUpdateListener = new XacmlPdpUpdateListener(sinkClient, message);
this.register = new XacmlPdpPapRegistration(sinkClient);
- this.message = new XacmlPdpMessage();
} catch (RuntimeException | TopicSinkClientException e) {
throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e);
}
@@ -154,10 +154,10 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
addAction("Initial Registration with PAP",
() -> {
- register.pdpRegistration(message.formatStatusMessage(PdpState.PASSIVE));
+ register.pdpRegistration(message.formatInitialStatusMessage(PdpState.PASSIVE));
},
() -> {
- register.pdpRegistration(message.formatStatusMessage(PdpState.TERMINATED));
+ register.pdpRegistration(message.formatInitialStatusMessage(PdpState.TERMINATED));
});
// @formatter:on