aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/main
diff options
context:
space:
mode:
authorjh7358 <jh7358@att.com>2019-10-16 11:51:46 -0400
committerJim Hahn <jrh3@att.com>2019-10-29 11:09:10 -0400
commit4f3fe21aa98a196ef1ddd62e1ac84a6c2e4c13cf (patch)
tree682a1ce5779a094318a0143aeb993ffb4d3ff59d /main/src/main
parent45c8e61888711a3d6f5913d1d7ddf640ff995b12 (diff)
Generate notifications when policies change
Updated existing PAP code to make use of new notification classes. Change-Id: I4637ad92ac4f432f215cfc837e672c75074d88b5 Issue-ID: POLICY-1841 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'main/src/main')
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/PapConstants.java2
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java15
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java14
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java8
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java10
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java3
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java14
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java24
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java9
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeleteProvider.java11
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeployProvider.java68
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/rest/depundep/ProviderBase.java37
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/rest/depundep/SessionData.java102
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java24
14 files changed, 292 insertions, 49 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java
index 3fc36f35..b8b199e6 100644
--- a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java
+++ b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java
@@ -31,10 +31,12 @@ public class PapConstants {
public static final String REG_PDP_MODIFY_LOCK = "lock:pdp";
public static final String REG_PDP_MODIFY_MAP = "object:pdp/modify/map";
public static final String REG_PDP_TRACKER = "object:pdp/tracker";
+ public static final String REG_POLICY_NOTIFIER = "object:policy/notifier";
public static final String REG_PAP_DAO_FACTORY = "object:pap/dao/factory";
// topic names
public static final String TOPIC_POLICY_PDP_PAP = "POLICY-PDP-PAP";
+ public static final String TOPIC_POLICY_NOTIFICATION = "POLICY-NOTIFICATION";
private PapConstants() {
super();
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java
index 3f0b5c11..f058da32 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java
@@ -41,6 +41,7 @@ import org.onap.policy.pap.main.comm.msgdata.Request;
import org.onap.policy.pap.main.comm.msgdata.RequestListener;
import org.onap.policy.pap.main.comm.msgdata.StateChangeReq;
import org.onap.policy.pap.main.comm.msgdata.UpdateReq;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
import org.onap.policy.pap.main.parameters.RequestParams;
import org.slf4j.Logger;
@@ -74,6 +75,11 @@ public class PdpModifyRequestMap {
*/
private final PolicyModelsProviderFactoryWrapper daoFactory;
+ /**
+ * Used to notify when policy updates completes.
+ */
+ private final PolicyNotifier policyNotifier;
+
/**
* Constructs the object.
@@ -88,6 +94,7 @@ public class PdpModifyRequestMap {
this.params = params;
this.modifyLock = params.getModifyLock();
this.daoFactory = params.getDaoFactory();
+ this.policyNotifier = params.getPolicyNotifier();
}
/**
@@ -150,7 +157,7 @@ public class PdpModifyRequestMap {
.setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount())
.setTimers(params.getUpdateTimers())
.setModifyLock(params.getModifyLock())
- .setPublisher(params.getPublisher())
+ .setPdpPublisher(params.getPdpPublisher())
.setResponseDispatcher(params.getResponseDispatcher());
// @formatter:on
@@ -179,7 +186,7 @@ public class PdpModifyRequestMap {
.setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount())
.setTimers(params.getStateChangeTimers())
.setModifyLock(params.getModifyLock())
- .setPublisher(params.getPublisher())
+ .setPdpPublisher(params.getPdpPublisher())
.setResponseDispatcher(params.getResponseDispatcher());
// @formatter:on
@@ -299,7 +306,7 @@ public class PdpModifyRequestMap {
* @return a new set of requests
*/
protected PdpRequests makePdpRequests(String pdpName) {
- return new PdpRequests(pdpName);
+ return new PdpRequests(pdpName, policyNotifier);
}
/**
@@ -359,6 +366,8 @@ public class PdpModifyRequestMap {
*/
private void disablePdp(PdpRequests requests) {
+ policyNotifier.removePdp(requests.getPdpName());
+
// remove the requests from the map
if (!pdp2requests.remove(requests.getPdpName(), requests)) {
// don't have the info we need to disable it
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java
index 5863b2cb..4e1e9233 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java
@@ -23,6 +23,7 @@ package org.onap.policy.pap.main.comm;
import lombok.Getter;
import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.pap.main.comm.msgdata.Request;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,12 @@ public class PdpRequests {
private final String pdpName;
/**
+ * Notifier for policy update completions.
+ */
+ @Getter
+ private final PolicyNotifier notifier;
+
+ /**
* Index of request currently being published.
*/
private int curIndex = 0;
@@ -60,8 +67,9 @@ public class PdpRequests {
*
* @param pdpName name of the PDP with which the requests are associated
*/
- public PdpRequests(String pdpName) {
+ public PdpRequests(String pdpName, PolicyNotifier notifier) {
this.pdpName = pdpName;
+ this.notifier = notifier;
}
/**
@@ -71,6 +79,8 @@ public class PdpRequests {
*/
public void addSingleton(Request request) {
+ request.setNotifier(notifier);
+
if (request.getMessage().getName() == null) {
throw new IllegalArgumentException("unexpected broadcast for " + pdpName);
}
@@ -86,7 +96,7 @@ public class PdpRequests {
singletons[priority] = request;
// stop publishing anything of a lower priority
- QueueToken<PdpMessage> token = stopPublishingLowerPriority(priority);
+ final QueueToken<PdpMessage> token = stopPublishingLowerPriority(priority);
// start publishing if nothing of higher priority
if (higherPrioritySingleton(priority)) {
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java
index 1f69dcb5..62aea789 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java
@@ -23,6 +23,7 @@ package org.onap.policy.pap.main.comm.msgdata;
import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.pap.main.comm.QueueToken;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
/**
* Request data, whose message may be changed at any point, possibly triggering a restart
@@ -61,6 +62,13 @@ public interface Request {
public void setListener(RequestListener listener);
/**
+ * Sets the notifier to track responses to the request.
+ *
+ * @param notifier notifier used to publish notifications
+ */
+ public void setNotifier(PolicyNotifier notifier);
+
+ /**
* Determines if this request is currently being published.
*
* @return {@code true} if this request is being published, {@code false} otherwise
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java
index 1945b32d..a110ef44 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java
@@ -30,6 +30,7 @@ import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.pap.main.comm.QueueToken;
import org.onap.policy.pap.main.comm.TimerManager;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
import org.onap.policy.pap.main.parameters.RequestParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +65,13 @@ public abstract class RequestImpl implements Request {
private RequestListener listener;
/**
+ * Notifier for policy update completions.
+ */
+ @Getter
+ @Setter
+ private PolicyNotifier notifier;
+
+ /**
* Current retry count.
*/
@Getter
@@ -217,7 +225,7 @@ public abstract class RequestImpl implements Request {
// couldn't take the other's place - add our own token to the queue
token = new QueueToken<>(message);
- params.getPublisher().enqueue(token);
+ params.getPdpPublisher().enqueue(token);
}
/**
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java
index e38cb0fd..40acd3ad 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java
@@ -20,7 +20,6 @@
package org.onap.policy.pap.main.comm.msgdata;
-import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStateChange;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.pap.main.parameters.RequestParams;
@@ -39,7 +38,7 @@ public class StateChangeReq extends RequestImpl {
*
* @throws IllegalArgumentException if a required parameter is not set
*/
- public StateChangeReq(RequestParams params, String name, PdpMessage message) {
+ public StateChangeReq(RequestParams params, String name, PdpStateChange message) {
super(params, name, message);
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java
index 8efdb7ca..6b04e726 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
-import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.models.pdp.concepts.PdpUpdate;
import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
@@ -48,7 +47,7 @@ public class UpdateReq extends RequestImpl {
*
* @throws IllegalArgumentException if a required parameter is not set
*/
- public UpdateReq(RequestParams params, String name, PdpMessage message) {
+ public UpdateReq(RequestParams params, String name, PdpUpdate message) {
super(params, name, message);
}
@@ -61,10 +60,15 @@ public class UpdateReq extends RequestImpl {
public String checkResponse(PdpStatus response) {
String reason = super.checkResponse(response);
if (reason != null) {
+ // response isn't for this PDP - don't generate notifications
return reason;
}
+ Set<ToscaPolicyIdentifier> actualSet = new HashSet<>(alwaysList(response.getPolicies()));
+ getNotifier().processResponse(getName(), actualSet);
+
PdpUpdate message = getMessage();
+
if (!StringUtils.equals(message.getPdpGroup(), response.getPdpGroup())) {
return "group does not match";
}
@@ -74,11 +78,11 @@ public class UpdateReq extends RequestImpl {
}
// see if the policies match
- Set<ToscaPolicyIdentifier> set1 = new HashSet<>(alwaysList(response.getPolicies()));
- Set<ToscaPolicyIdentifier> set2 = new HashSet<>(alwaysList(message.getPolicies()).stream()
+
+ Set<ToscaPolicyIdentifier> expectedSet = new HashSet<>(alwaysList(message.getPolicies()).stream()
.map(ToscaPolicy::getIdentifier).collect(Collectors.toSet()));
- if (!set1.equals(set2)) {
+ if (!actualSet.equals(expectedSet)) {
return "policies do not match";
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java
index 2f74bf3d..9a3e7a45 100644
--- a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java
+++ b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java
@@ -22,10 +22,12 @@ package org.onap.policy.pap.main.parameters;
import lombok.Getter;
import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
import org.onap.policy.pap.main.comm.Publisher;
import org.onap.policy.pap.main.comm.TimerManager;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
/**
@@ -33,13 +35,14 @@ import org.onap.policy.pap.main.comm.TimerManager;
*/
@Getter
public class PdpModifyRequestMapParams {
- private Publisher publisher;
+ private Publisher<PdpMessage> pdpPublisher;
private RequestIdDispatcher<PdpStatus> responseDispatcher;
private Object modifyLock;
private PdpParameters params;
private TimerManager updateTimers;
private TimerManager stateChangeTimers;
private PolicyModelsProviderFactoryWrapper daoFactory;
+ private PolicyNotifier policyNotifier;
public PdpModifyRequestMapParams setParams(PdpParameters params) {
this.params = params;
@@ -61,8 +64,13 @@ public class PdpModifyRequestMapParams {
return this;
}
- public PdpModifyRequestMapParams setPublisher(Publisher publisher) {
- this.publisher = publisher;
+ public PdpModifyRequestMapParams setPolicyNotifier(PolicyNotifier policyNotifier) {
+ this.policyNotifier = policyNotifier;
+ return this;
+ }
+
+ public PdpModifyRequestMapParams setPdpPublisher(Publisher<PdpMessage> pdpPublisher) {
+ this.pdpPublisher = pdpPublisher;
return this;
}
@@ -80,7 +88,7 @@ public class PdpModifyRequestMapParams {
* Validates the parameters.
*/
public void validate() {
- if (publisher == null) {
+ if (pdpPublisher == null) {
throw new IllegalArgumentException("missing publisher");
}
@@ -103,5 +111,13 @@ public class PdpModifyRequestMapParams {
if (stateChangeTimers == null) {
throw new IllegalArgumentException("missing stateChangeTimers");
}
+
+ if (daoFactory == null) {
+ throw new IllegalArgumentException("missing DAO factory");
+ }
+
+ if (policyNotifier == null) {
+ throw new IllegalArgumentException("missing policy notifier");
+ }
}
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java
index b9083864..57d4ddaa 100644
--- a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java
+++ b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java
@@ -22,6 +22,7 @@ package org.onap.policy.pap.main.parameters;
import lombok.Getter;
import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.pap.main.comm.Publisher;
import org.onap.policy.pap.main.comm.TimerManager;
@@ -32,15 +33,15 @@ import org.onap.policy.pap.main.comm.TimerManager;
*/
@Getter
public class RequestParams {
- private Publisher publisher;
+ private Publisher<PdpMessage> pdpPublisher;
private RequestIdDispatcher<PdpStatus> responseDispatcher;
private Object modifyLock;
private TimerManager timers;
private int maxRetryCount;
- public RequestParams setPublisher(Publisher publisher) {
- this.publisher = publisher;
+ public RequestParams setPdpPublisher(Publisher<PdpMessage> publisher) {
+ this.pdpPublisher = publisher;
return this;
}
@@ -68,7 +69,7 @@ public class RequestParams {
* Validates the parameters.
*/
public void validate() {
- if (publisher == null) {
+ if (pdpPublisher == null) {
throw new IllegalArgumentException("missing publisher");
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeleteProvider.java b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeleteProvider.java
index 0ca32451..09ad862b 100644
--- a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeleteProvider.java
+++ b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeleteProvider.java
@@ -21,12 +21,13 @@
package org.onap.policy.pap.main.rest.depundep;
import java.util.Iterator;
-import java.util.function.BiFunction;
+import java.util.Set;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import javax.ws.rs.core.Response.Status;
import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.pdp.concepts.Pdp;
import org.onap.policy.models.pdp.concepts.PdpGroup;
-import org.onap.policy.models.pdp.concepts.PdpSubGroup;
import org.onap.policy.models.pdp.enums.PdpState;
import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
@@ -122,7 +123,7 @@ public class PdpGroupDeleteProvider extends ProviderBase {
* Returns a function that will remove the desired policy from a subgroup.
*/
@Override
- protected BiFunction<PdpGroup, PdpSubGroup, Boolean> makeUpdater(ToscaPolicy policy,
+ protected Updater makeUpdater(SessionData data, ToscaPolicy policy,
ToscaPolicyIdentifierOptVersion desiredIdent) {
// construct a matcher based on whether or not the version was specified
@@ -142,6 +143,8 @@ public class PdpGroupDeleteProvider extends ProviderBase {
// return a function that will remove the policy from the subgroup
return (group, subgroup) -> {
+ Set<String> pdps = subgroup.getPdpInstances().stream().map(Pdp::getInstanceId).collect(Collectors.toSet());
+
boolean result = false;
Iterator<ToscaPolicyIdentifier> iter = subgroup.getPolicies().iterator();
@@ -153,6 +156,8 @@ public class PdpGroupDeleteProvider extends ProviderBase {
iter.remove();
logger.info("remove policy {} {} from subgroup {} {} count={}", ident.getName(), ident.getVersion(),
group.getName(), subgroup.getPdpType(), subgroup.getPolicies().size());
+
+ data.trackUndeploy(ident, pdps);
}
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeployProvider.java b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeployProvider.java
index bc3148ef..1e00528e 100644
--- a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeployProvider.java
+++ b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeployProvider.java
@@ -28,8 +28,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.BiFunction;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import javax.ws.rs.core.Response.Status;
import org.onap.policy.common.parameters.BeanValidationResult;
import org.onap.policy.common.parameters.ObjectValidationResult;
@@ -269,8 +269,9 @@ public class PdpGroupDeployProvider extends ProviderBase {
* @param dbgroup the group, as it appears within the DB
* @param group the group being updated
* @return {@code true} if a subgroup was removed, {@code false} otherwise
+ * @throws PfModelException if an error occurred
*/
- private boolean notifyPdpsDelSubGroups(SessionData data, PdpGroup dbgroup, PdpGroup group) {
+ private boolean notifyPdpsDelSubGroups(SessionData data, PdpGroup dbgroup, PdpGroup group) throws PfModelException {
boolean updated = false;
// subgroups, as they appear within the updated group
@@ -284,6 +285,7 @@ public class PdpGroupDeployProvider extends ProviderBase {
// this subgroup no longer appears - notify its PDPs
updated = true;
notifyPdpsDelSubGroup(data, subgrp);
+ trackPdpsDelSubGroup(data, subgrp);
}
}
@@ -314,10 +316,26 @@ public class PdpGroupDeployProvider extends ProviderBase {
}
/**
+ * Tracks PDP responses when their subgroup is removed.
+ *
+ * @param data session data
+ * @param subgrp subgroup that is being removed
+ * @throws PfModelException if an error occurred
+ */
+ private void trackPdpsDelSubGroup(SessionData data, PdpSubGroup subgrp) throws PfModelException {
+ Set<String> pdps = subgrp.getPdpInstances().stream().map(Pdp::getInstanceId).collect(Collectors.toSet());
+
+ for (ToscaPolicyIdentifier policyId : subgrp.getPolicies()) {
+ data.trackUndeploy(policyId, pdps);
+ }
+ }
+
+ /**
* Adds a new subgroup.
*
* @param data session data
- * @param subgrp the subgroup to be added, updated to fully qualified versions upon return
+ * @param subgrp the subgroup to be added, updated to fully qualified versions upon
+ * return
* @return the validation result
* @throws PfModelException if an error occurred
*/
@@ -339,7 +357,8 @@ public class PdpGroupDeployProvider extends ProviderBase {
* @param data session data
* @param dbgroup the group, from the DB, containing the subgroup
* @param dbsub the subgroup, from the DB
- * @param subgrp the subgroup to be updated, updated to fully qualified versions upon return
+ * @param subgrp the subgroup to be updated, updated to fully qualified versions upon
+ * return
* @param container container for additional validation results
* @return {@code true} if the subgroup content was changed, {@code false} if there
* were no changes
@@ -356,7 +375,7 @@ public class PdpGroupDeployProvider extends ProviderBase {
/*
* first, apply the changes about which the PDPs care
*/
- boolean updated = updateList(dbsub.getPolicies(), subgrp.getPolicies(), dbsub::setPolicies);
+ boolean updated = updatePolicies(data, dbsub, subgrp);
// publish any changes to the PDPs
if (updated) {
@@ -373,12 +392,40 @@ public class PdpGroupDeployProvider extends ProviderBase {
dbsub::setDesiredInstanceCount) || updated;
}
+ private boolean updatePolicies(SessionData data, PdpSubGroup dbsub, PdpSubGroup subgrp) throws PfModelException {
+ Set<ToscaPolicyIdentifier> undeployed = new HashSet<>(dbsub.getPolicies());
+ undeployed.removeAll(subgrp.getPolicies());
+
+ Set<ToscaPolicyIdentifier> deployed = new HashSet<>(subgrp.getPolicies());
+ deployed.removeAll(dbsub.getPolicies());
+
+ if (deployed.isEmpty() && undeployed.isEmpty()) {
+ // lists are identical
+ return false;
+ }
+
+
+ Set<String> pdps = subgrp.getPdpInstances().stream().map(Pdp::getInstanceId).collect(Collectors.toSet());
+
+ for (ToscaPolicyIdentifier policyId : deployed) {
+ data.trackDeploy(policyId, pdps);
+ }
+
+ for (ToscaPolicyIdentifier policyId : undeployed) {
+ data.trackUndeploy(policyId, pdps);
+ }
+
+ dbsub.setPolicies(new ArrayList<>(subgrp.getPolicies()));
+ return true;
+ }
+
/**
* Performs additional validations of a subgroup.
*
* @param data session data
* @param dbsub the subgroup, from the DB
- * @param subgrp the subgroup to be validated, updated to fully qualified versions upon return
+ * @param subgrp the subgroup to be validated, updated to fully qualified versions
+ * upon return
* @param container container for additional validation results
* @return {@code true} if the subgroup is valid, {@code false} otherwise
* @throws PfModelException if an error occurred
@@ -455,7 +502,8 @@ public class PdpGroupDeployProvider extends ProviderBase {
private ValidationResult validatePolicies(SessionData data, PdpSubGroup dbsub, PdpSubGroup subgrp)
throws PfModelException {
- // build a map of the DB data, from policy name to (fully qualified) policy version
+ // build a map of the DB data, from policy name to (fully qualified) policy
+ // version
Map<String, String> dbname2vers = new HashMap<>();
if (dbsub != null) {
dbsub.getPolicies().forEach(ident -> dbname2vers.put(ident.getName(), ident.getVersion()));
@@ -557,7 +605,7 @@ public class PdpGroupDeployProvider extends ProviderBase {
* Adds a policy to a subgroup, if it isn't there already.
*/
@Override
- protected BiFunction<PdpGroup, PdpSubGroup, Boolean> makeUpdater(ToscaPolicy policy,
+ protected Updater makeUpdater(SessionData data, ToscaPolicy policy,
ToscaPolicyIdentifierOptVersion requestedIdent) {
ToscaPolicyIdentifier desiredIdent = policy.getIdentifier();
@@ -586,6 +634,10 @@ public class PdpGroupDeployProvider extends ProviderBase {
logger.info("add policy {} {} to subgroup {} {} count={}", desiredIdent.getName(),
desiredIdent.getVersion(), group.getName(), subgroup.getPdpType(),
subgroup.getPolicies().size());
+
+ Set<String> pdps = subgroup.getPdpInstances().stream().map(Pdp::getInstanceId).collect(Collectors.toSet());
+ data.trackDeploy(desiredIdent, pdps);
+
return true;
};
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/ProviderBase.java b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/ProviderBase.java
index 70ccd7ab..fd8edf17 100644
--- a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/ProviderBase.java
+++ b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/ProviderBase.java
@@ -21,17 +21,13 @@
package org.onap.policy.pap.main.rest.depundep;
import java.util.Collection;
-import java.util.Collections;
-import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response.Status;
-import org.apache.commons.lang3.tuple.Pair;
import org.onap.policy.common.utils.services.Registry;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.base.PfModelRuntimeException;
import org.onap.policy.models.pdp.concepts.Pdp;
import org.onap.policy.models.pdp.concepts.PdpGroup;
-import org.onap.policy.models.pdp.concepts.PdpStateChange;
import org.onap.policy.models.pdp.concepts.PdpSubGroup;
import org.onap.policy.models.pdp.concepts.PdpUpdate;
import org.onap.policy.models.provider.PolicyModelsProvider;
@@ -41,6 +37,7 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyTypeIdentifi
import org.onap.policy.pap.main.PapConstants;
import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +67,11 @@ public abstract class ProviderBase {
private final PdpModifyRequestMap requestMap;
/**
+ * Generates policy notifications based on responses from PDPs.
+ */
+ private final PolicyNotifier notifier;
+
+ /**
* Factory for PAP DAO.
*/
private final PolicyModelsProviderFactoryWrapper daoFactory;
@@ -82,6 +84,7 @@ public abstract class ProviderBase {
this.updateLock = Registry.get(PapConstants.REG_PDP_MODIFY_LOCK, Object.class);
this.requestMap = Registry.get(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class);
this.daoFactory = Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class);
+ this.notifier = Registry.get(PapConstants.REG_POLICY_NOTIFIER, PolicyNotifier.class);
}
/**
@@ -94,19 +97,16 @@ public abstract class ProviderBase {
protected <T> void process(T request, BiConsumerWithEx<SessionData, T> processor) throws PfModelException {
synchronized (updateLock) {
- // list of requests to be published to the PDPs
- Collection<Pair<PdpUpdate, PdpStateChange>> requests = Collections.emptyList();
+ SessionData data;
try (PolicyModelsProvider dao = daoFactory.create()) {
- SessionData data = new SessionData(dao);
+ data = new SessionData(dao);
processor.accept(data, request);
// make all of the DB updates
data.updateDb();
- requests = data.getPdpRequests();
-
} catch (PfModelException | PfModelRuntimeException e) {
logger.warn(DEPLOY_FAILED, e);
throw e;
@@ -116,9 +116,12 @@ public abstract class ProviderBase {
throw new PfModelException(Status.INTERNAL_SERVER_ERROR, "request failed", e);
}
+ // track responses for notification purposes
+ data.getDeployData().forEach(notifier::addDeploymentData);
+ data.getUndeployData().forEach(notifier::addUndeploymentData);
// publish the requests
- requests.forEach(pair -> requestMap.addRequest(pair.getLeft(), pair.getRight()));
+ data.getPdpRequests().forEach(pair -> requestMap.addRequest(pair.getLeft(), pair.getRight()));
}
}
@@ -140,7 +143,7 @@ public abstract class ProviderBase {
+ desiredPolicy.getName() + " " + desiredPolicy.getVersion());
}
- BiFunction<PdpGroup, PdpSubGroup, Boolean> updater = makeUpdater(policy, desiredPolicy);
+ Updater updater = makeUpdater(data, policy, desiredPolicy);
for (PdpGroup group : groups) {
upgradeGroup(data, group, updater);
@@ -152,11 +155,12 @@ public abstract class ProviderBase {
* {@code true} if the subgroup was updated, {@code false} if no update was
* necessary/appropriate.
*
+ * @param data session data
* @param policy policy to be added to or removed from each subgroup
* @param desiredPolicy request policy
* @return a function to update a subgroup
*/
- protected abstract BiFunction<PdpGroup, PdpSubGroup, Boolean> makeUpdater(ToscaPolicy policy,
+ protected abstract Updater makeUpdater(SessionData data, ToscaPolicy policy,
ToscaPolicyIdentifierOptVersion desiredPolicy);
/**
@@ -180,9 +184,9 @@ public abstract class ProviderBase {
* @param data session data
* @param group the original group, to be updated
* @param updater function to update a group
- * @throws PfModelRuntimeException if an error occurred
+ * @throws PfModelException if an error occurred
*/
- private void upgradeGroup(SessionData data, PdpGroup group, BiFunction<PdpGroup, PdpSubGroup, Boolean> updater) {
+ private void upgradeGroup(SessionData data, PdpGroup group, Updater updater) throws PfModelException {
boolean updated = false;
@@ -275,4 +279,9 @@ public abstract class ProviderBase {
*/
void accept(F firstArg, S secondArg) throws PfModelException;
}
+
+ @FunctionalInterface
+ public static interface Updater {
+ boolean apply(PdpGroup group, PdpSubGroup subgroup) throws PfModelException;
+ }
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/SessionData.java b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/SessionData.java
index 437d7a11..5b2a8eea 100644
--- a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/SessionData.java
+++ b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/SessionData.java
@@ -23,8 +23,10 @@ package org.onap.policy.pap.main.rest.depundep;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
@@ -38,9 +40,11 @@ import org.onap.policy.models.provider.PolicyModelsProvider;
import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyFilter;
import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyFilter.ToscaPolicyFilterBuilder;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifierOptVersion;
import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyType;
import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyTypeIdentifier;
+import org.onap.policy.pap.main.notification.PolicyPdpNotificationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,6 +92,18 @@ public class SessionData {
*/
private final Map<ToscaPolicyTypeIdentifier, ToscaPolicyType> typeCache = new HashMap<>();
+ /**
+ * Policies to be deployed. This is just used to build up the data, which is then
+ * passed to the notifier once the update is "committed".
+ */
+ private final Map<ToscaPolicyIdentifier, PolicyPdpNotificationData> deploy = new HashMap<>();
+
+ /**
+ * Policies to be undeployed. This is just used to build up the data, which is then
+ * passed to the notifier once the update is "committed".
+ */
+ private final Map<ToscaPolicyIdentifier, PolicyPdpNotificationData> undeploy = new HashMap<>();
+
/**
* Constructs the object.
@@ -402,4 +418,90 @@ public class SessionData {
logger.info("deleting DB group {}", group.getName());
dao.deletePdpGroup(group.getName());
}
+
+ /**
+ * Adds policy deployment data.
+ *
+ * @param policyId ID of the policy being deployed
+ * @param pdps PDPs to which the policy is being deployed
+ * @throws PfModelException if an error occurred
+ */
+ protected void trackDeploy(ToscaPolicyIdentifier policyId, Collection<String> pdps) throws PfModelException {
+ trackDeploy(policyId, new HashSet<>(pdps));
+ }
+
+ /**
+ * Adds policy deployment data.
+ *
+ * @param policyId ID of the policy being deployed
+ * @param pdps PDPs to which the policy is being deployed
+ * @throws PfModelException if an error occurred
+ */
+ protected void trackDeploy(ToscaPolicyIdentifier policyId, Set<String> pdps) throws PfModelException {
+ addData(policyId, pdps, deploy, undeploy);
+ }
+
+ /**
+ * Adds policy undeployment data.
+ *
+ * @param policyId ID of the policy being undeployed
+ * @param pdps PDPs to which the policy is being undeployed
+ * @throws PfModelException if an error occurred
+ */
+ protected void trackUndeploy(ToscaPolicyIdentifier policyId, Collection<String> pdps) throws PfModelException {
+ trackUndeploy(policyId, new HashSet<>(pdps));
+ }
+
+ /**
+ * Adds policy undeployment data.
+ *
+ * @param policyId ID of the policy being undeployed
+ * @param pdps PDPs to which the policy is being undeployed
+ * @throws PfModelException if an error occurred
+ */
+ protected void trackUndeploy(ToscaPolicyIdentifier policyId, Set<String> pdps) throws PfModelException {
+ addData(policyId, pdps, undeploy, deploy);
+ }
+
+ /**
+ * Adds policy deployment/undeployment data.
+ *
+ * @param policyId ID of the policy being deployed/undeployed
+ * @param pdps PDPs to which the policy is being deployed/undeployed
+ * @param addMap map to which it should be added
+ * @param removeMap map from which it should be removed
+ * @throws PfModelException if an error occurred
+ */
+ private void addData(ToscaPolicyIdentifier policyId, Set<String> pdps,
+ Map<ToscaPolicyIdentifier, PolicyPdpNotificationData> addMap,
+ Map<ToscaPolicyIdentifier, PolicyPdpNotificationData> removeMap) throws PfModelException {
+
+ PolicyPdpNotificationData removeData = removeMap.get(policyId);
+ if (removeData != null) {
+ removeData.removeAll(pdps);
+ }
+
+ ToscaPolicyIdentifierOptVersion optid = new ToscaPolicyIdentifierOptVersion(policyId);
+ ToscaPolicyTypeIdentifier policyType = getPolicy(optid).getTypeIdentifier();
+
+ addMap.computeIfAbsent(policyId, key -> new PolicyPdpNotificationData(policyId, policyType)).addAll(pdps);
+ }
+
+ /**
+ * Gets the policies to be deployed.
+ *
+ * @return the policies to be deployed
+ */
+ public Collection<PolicyPdpNotificationData> getDeployData() {
+ return deploy.values();
+ }
+
+ /**
+ * Gets the policies to be undeployed.
+ *
+ * @return the policies to be undeployed
+ */
+ public Collection<PolicyPdpNotificationData> getUndeployData() {
+ return undeploy.values();
+ }
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
index d119044f..8d2fd3ea 100644
--- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
+++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
@@ -31,6 +31,8 @@ import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
import org.onap.policy.common.parameters.ParameterService;
import org.onap.policy.common.utils.services.Registry;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.onap.policy.models.pap.concepts.PolicyNotification;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.models.pdp.enums.PdpMessageType;
import org.onap.policy.pap.main.PapConstants;
@@ -41,6 +43,7 @@ import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
import org.onap.policy.pap.main.comm.PdpTracker;
import org.onap.policy.pap.main.comm.Publisher;
import org.onap.policy.pap.main.comm.TimerManager;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
import org.onap.policy.pap.main.parameters.PapParameterGroup;
import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
import org.onap.policy.pap.main.parameters.PdpParameters;
@@ -112,13 +115,15 @@ public class PapActivator extends ServiceManagerContainer {
final Object pdpUpdateLock = new Object();
final PdpParameters pdpParams = papParameterGroup.getPdpParameters();
- final AtomicReference<Publisher> pdpPub = new AtomicReference<>();
+ final AtomicReference<Publisher<PdpMessage>> pdpPub = new AtomicReference<>();
+ final AtomicReference<Publisher<PolicyNotification>> notifyPub = new AtomicReference<>();
final AtomicReference<TimerManager> pdpUpdTimers = new AtomicReference<>();
final AtomicReference<TimerManager> pdpStChgTimers = new AtomicReference<>();
final AtomicReference<TimerManager> heartBeatTimers = new AtomicReference<>();
final AtomicReference<PolicyModelsProviderFactoryWrapper> daoFactory = new AtomicReference<>();
final AtomicReference<PdpModifyRequestMap> requestMap = new AtomicReference<>();
final AtomicReference<RestServer> restServer = new AtomicReference<>();
+ final AtomicReference<PolicyNotifier> notifier = new AtomicReference<>();
// @formatter:off
addAction("PAP parameters",
@@ -156,11 +161,23 @@ public class PapActivator extends ServiceManagerContainer {
addAction("PDP publisher",
() -> {
- pdpPub.set(new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP));
+ pdpPub.set(new Publisher<>(PapConstants.TOPIC_POLICY_PDP_PAP));
startThread(pdpPub.get());
},
() -> pdpPub.get().stop());
+ addAction("Policy Notification publisher",
+ () -> {
+ notifyPub.set(new Publisher<>(PapConstants.TOPIC_POLICY_NOTIFICATION));
+ startThread(notifyPub.get());
+ notifier.set(new PolicyNotifier(notifyPub.get()));
+ },
+ () -> notifyPub.get().stop());
+
+ addAction("Policy Notifier",
+ () -> Registry.register(PapConstants.REG_POLICY_NOTIFIER, notifier.get()),
+ () -> Registry.unregister(PapConstants.REG_POLICY_NOTIFIER));
+
addAction("PDP heart beat timers",
() -> {
long maxWaitHeartBeatMs = MAX_MISSED_HEARTBEATS * pdpParams.getHeartBeatMs();
@@ -194,7 +211,8 @@ public class PapActivator extends ServiceManagerContainer {
.setDaoFactory(daoFactory.get())
.setModifyLock(pdpUpdateLock)
.setParams(pdpParams)
- .setPublisher(pdpPub.get())
+ .setPolicyNotifier(notifier.get())
+ .setPdpPublisher(pdpPub.get())
.setResponseDispatcher(reqIdDispatcher)
.setStateChangeTimers(pdpStChgTimers.get())
.setUpdateTimers(pdpUpdTimers.get())));