diff options
author | jh7358 <jh7358@att.com> | 2019-10-16 11:51:46 -0400 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2019-10-29 11:09:10 -0400 |
commit | 4f3fe21aa98a196ef1ddd62e1ac84a6c2e4c13cf (patch) | |
tree | 682a1ce5779a094318a0143aeb993ffb4d3ff59d /main/src/main | |
parent | 45c8e61888711a3d6f5913d1d7ddf640ff995b12 (diff) |
Generate notifications when policies change
Updated existing PAP code to make use of new notification classes.
Change-Id: I4637ad92ac4f432f215cfc837e672c75074d88b5
Issue-ID: POLICY-1841
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'main/src/main')
14 files changed, 292 insertions, 49 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java index 3fc36f35..b8b199e6 100644 --- a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java +++ b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java @@ -31,10 +31,12 @@ public class PapConstants { public static final String REG_PDP_MODIFY_LOCK = "lock:pdp"; public static final String REG_PDP_MODIFY_MAP = "object:pdp/modify/map"; public static final String REG_PDP_TRACKER = "object:pdp/tracker"; + public static final String REG_POLICY_NOTIFIER = "object:policy/notifier"; public static final String REG_PAP_DAO_FACTORY = "object:pap/dao/factory"; // topic names public static final String TOPIC_POLICY_PDP_PAP = "POLICY-PDP-PAP"; + public static final String TOPIC_POLICY_NOTIFICATION = "POLICY-NOTIFICATION"; private PapConstants() { super(); diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java index 3f0b5c11..f058da32 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java @@ -41,6 +41,7 @@ import org.onap.policy.pap.main.comm.msgdata.Request; import org.onap.policy.pap.main.comm.msgdata.RequestListener; import org.onap.policy.pap.main.comm.msgdata.StateChangeReq; import org.onap.policy.pap.main.comm.msgdata.UpdateReq; +import org.onap.policy.pap.main.notification.PolicyNotifier; import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; import org.onap.policy.pap.main.parameters.RequestParams; import org.slf4j.Logger; @@ -74,6 +75,11 @@ public class PdpModifyRequestMap { */ private final PolicyModelsProviderFactoryWrapper daoFactory; + /** + * Used to notify when policy updates completes. + */ + private final PolicyNotifier policyNotifier; + /** * Constructs the object. @@ -88,6 +94,7 @@ public class PdpModifyRequestMap { this.params = params; this.modifyLock = params.getModifyLock(); this.daoFactory = params.getDaoFactory(); + this.policyNotifier = params.getPolicyNotifier(); } /** @@ -150,7 +157,7 @@ public class PdpModifyRequestMap { .setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount()) .setTimers(params.getUpdateTimers()) .setModifyLock(params.getModifyLock()) - .setPublisher(params.getPublisher()) + .setPdpPublisher(params.getPdpPublisher()) .setResponseDispatcher(params.getResponseDispatcher()); // @formatter:on @@ -179,7 +186,7 @@ public class PdpModifyRequestMap { .setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount()) .setTimers(params.getStateChangeTimers()) .setModifyLock(params.getModifyLock()) - .setPublisher(params.getPublisher()) + .setPdpPublisher(params.getPdpPublisher()) .setResponseDispatcher(params.getResponseDispatcher()); // @formatter:on @@ -299,7 +306,7 @@ public class PdpModifyRequestMap { * @return a new set of requests */ protected PdpRequests makePdpRequests(String pdpName) { - return new PdpRequests(pdpName); + return new PdpRequests(pdpName, policyNotifier); } /** @@ -359,6 +366,8 @@ public class PdpModifyRequestMap { */ private void disablePdp(PdpRequests requests) { + policyNotifier.removePdp(requests.getPdpName()); + // remove the requests from the map if (!pdp2requests.remove(requests.getPdpName(), requests)) { // don't have the info we need to disable it diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java index 5863b2cb..4e1e9233 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java @@ -23,6 +23,7 @@ package org.onap.policy.pap.main.comm; import lombok.Getter; import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.pap.main.comm.msgdata.Request; +import org.onap.policy.pap.main.notification.PolicyNotifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,12 @@ public class PdpRequests { private final String pdpName; /** + * Notifier for policy update completions. + */ + @Getter + private final PolicyNotifier notifier; + + /** * Index of request currently being published. */ private int curIndex = 0; @@ -60,8 +67,9 @@ public class PdpRequests { * * @param pdpName name of the PDP with which the requests are associated */ - public PdpRequests(String pdpName) { + public PdpRequests(String pdpName, PolicyNotifier notifier) { this.pdpName = pdpName; + this.notifier = notifier; } /** @@ -71,6 +79,8 @@ public class PdpRequests { */ public void addSingleton(Request request) { + request.setNotifier(notifier); + if (request.getMessage().getName() == null) { throw new IllegalArgumentException("unexpected broadcast for " + pdpName); } @@ -86,7 +96,7 @@ public class PdpRequests { singletons[priority] = request; // stop publishing anything of a lower priority - QueueToken<PdpMessage> token = stopPublishingLowerPriority(priority); + final QueueToken<PdpMessage> token = stopPublishingLowerPriority(priority); // start publishing if nothing of higher priority if (higherPrioritySingleton(priority)) { diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java index 1f69dcb5..62aea789 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java @@ -23,6 +23,7 @@ package org.onap.policy.pap.main.comm.msgdata; import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.pap.main.comm.QueueToken; +import org.onap.policy.pap.main.notification.PolicyNotifier; /** * Request data, whose message may be changed at any point, possibly triggering a restart @@ -61,6 +62,13 @@ public interface Request { public void setListener(RequestListener listener); /** + * Sets the notifier to track responses to the request. + * + * @param notifier notifier used to publish notifications + */ + public void setNotifier(PolicyNotifier notifier); + + /** * Determines if this request is currently being published. * * @return {@code true} if this request is being published, {@code false} otherwise diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java index 1945b32d..a110ef44 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java @@ -30,6 +30,7 @@ import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.pap.main.comm.QueueToken; import org.onap.policy.pap.main.comm.TimerManager; +import org.onap.policy.pap.main.notification.PolicyNotifier; import org.onap.policy.pap.main.parameters.RequestParams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +65,13 @@ public abstract class RequestImpl implements Request { private RequestListener listener; /** + * Notifier for policy update completions. + */ + @Getter + @Setter + private PolicyNotifier notifier; + + /** * Current retry count. */ @Getter @@ -217,7 +225,7 @@ public abstract class RequestImpl implements Request { // couldn't take the other's place - add our own token to the queue token = new QueueToken<>(message); - params.getPublisher().enqueue(token); + params.getPdpPublisher().enqueue(token); } /** diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java index e38cb0fd..40acd3ad 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java @@ -20,7 +20,6 @@ package org.onap.policy.pap.main.comm.msgdata; -import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.models.pdp.concepts.PdpStateChange; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.pap.main.parameters.RequestParams; @@ -39,7 +38,7 @@ public class StateChangeReq extends RequestImpl { * * @throws IllegalArgumentException if a required parameter is not set */ - public StateChangeReq(RequestParams params, String name, PdpMessage message) { + public StateChangeReq(RequestParams params, String name, PdpStateChange message) { super(params, name, message); } diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java index 8efdb7ca..6b04e726 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.models.pdp.concepts.PdpUpdate; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; @@ -48,7 +47,7 @@ public class UpdateReq extends RequestImpl { * * @throws IllegalArgumentException if a required parameter is not set */ - public UpdateReq(RequestParams params, String name, PdpMessage message) { + public UpdateReq(RequestParams params, String name, PdpUpdate message) { super(params, name, message); } @@ -61,10 +60,15 @@ public class UpdateReq extends RequestImpl { public String checkResponse(PdpStatus response) { String reason = super.checkResponse(response); if (reason != null) { + // response isn't for this PDP - don't generate notifications return reason; } + Set<ToscaPolicyIdentifier> actualSet = new HashSet<>(alwaysList(response.getPolicies())); + getNotifier().processResponse(getName(), actualSet); + PdpUpdate message = getMessage(); + if (!StringUtils.equals(message.getPdpGroup(), response.getPdpGroup())) { return "group does not match"; } @@ -74,11 +78,11 @@ public class UpdateReq extends RequestImpl { } // see if the policies match - Set<ToscaPolicyIdentifier> set1 = new HashSet<>(alwaysList(response.getPolicies())); - Set<ToscaPolicyIdentifier> set2 = new HashSet<>(alwaysList(message.getPolicies()).stream() + + Set<ToscaPolicyIdentifier> expectedSet = new HashSet<>(alwaysList(message.getPolicies()).stream() .map(ToscaPolicy::getIdentifier).collect(Collectors.toSet())); - if (!set1.equals(set2)) { + if (!actualSet.equals(expectedSet)) { return "policies do not match"; } diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java index 2f74bf3d..9a3e7a45 100644 --- a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java +++ b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java @@ -22,10 +22,12 @@ package org.onap.policy.pap.main.parameters; import lombok.Getter; import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; +import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; import org.onap.policy.pap.main.comm.Publisher; import org.onap.policy.pap.main.comm.TimerManager; +import org.onap.policy.pap.main.notification.PolicyNotifier; /** @@ -33,13 +35,14 @@ import org.onap.policy.pap.main.comm.TimerManager; */ @Getter public class PdpModifyRequestMapParams { - private Publisher publisher; + private Publisher<PdpMessage> pdpPublisher; private RequestIdDispatcher<PdpStatus> responseDispatcher; private Object modifyLock; private PdpParameters params; private TimerManager updateTimers; private TimerManager stateChangeTimers; private PolicyModelsProviderFactoryWrapper daoFactory; + private PolicyNotifier policyNotifier; public PdpModifyRequestMapParams setParams(PdpParameters params) { this.params = params; @@ -61,8 +64,13 @@ public class PdpModifyRequestMapParams { return this; } - public PdpModifyRequestMapParams setPublisher(Publisher publisher) { - this.publisher = publisher; + public PdpModifyRequestMapParams setPolicyNotifier(PolicyNotifier policyNotifier) { + this.policyNotifier = policyNotifier; + return this; + } + + public PdpModifyRequestMapParams setPdpPublisher(Publisher<PdpMessage> pdpPublisher) { + this.pdpPublisher = pdpPublisher; return this; } @@ -80,7 +88,7 @@ public class PdpModifyRequestMapParams { * Validates the parameters. */ public void validate() { - if (publisher == null) { + if (pdpPublisher == null) { throw new IllegalArgumentException("missing publisher"); } @@ -103,5 +111,13 @@ public class PdpModifyRequestMapParams { if (stateChangeTimers == null) { throw new IllegalArgumentException("missing stateChangeTimers"); } + + if (daoFactory == null) { + throw new IllegalArgumentException("missing DAO factory"); + } + + if (policyNotifier == null) { + throw new IllegalArgumentException("missing policy notifier"); + } } } diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java index b9083864..57d4ddaa 100644 --- a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java +++ b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java @@ -22,6 +22,7 @@ package org.onap.policy.pap.main.parameters; import lombok.Getter; import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; +import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.pap.main.comm.Publisher; import org.onap.policy.pap.main.comm.TimerManager; @@ -32,15 +33,15 @@ import org.onap.policy.pap.main.comm.TimerManager; */ @Getter public class RequestParams { - private Publisher publisher; + private Publisher<PdpMessage> pdpPublisher; private RequestIdDispatcher<PdpStatus> responseDispatcher; private Object modifyLock; private TimerManager timers; private int maxRetryCount; - public RequestParams setPublisher(Publisher publisher) { - this.publisher = publisher; + public RequestParams setPdpPublisher(Publisher<PdpMessage> publisher) { + this.pdpPublisher = publisher; return this; } @@ -68,7 +69,7 @@ public class RequestParams { * Validates the parameters. */ public void validate() { - if (publisher == null) { + if (pdpPublisher == null) { throw new IllegalArgumentException("missing publisher"); } diff --git a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeleteProvider.java b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeleteProvider.java index 0ca32451..09ad862b 100644 --- a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeleteProvider.java +++ b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeleteProvider.java @@ -21,12 +21,13 @@ package org.onap.policy.pap.main.rest.depundep; import java.util.Iterator; -import java.util.function.BiFunction; +import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Collectors; import javax.ws.rs.core.Response.Status; import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.pdp.concepts.Pdp; import org.onap.policy.models.pdp.concepts.PdpGroup; -import org.onap.policy.models.pdp.concepts.PdpSubGroup; import org.onap.policy.models.pdp.enums.PdpState; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; @@ -122,7 +123,7 @@ public class PdpGroupDeleteProvider extends ProviderBase { * Returns a function that will remove the desired policy from a subgroup. */ @Override - protected BiFunction<PdpGroup, PdpSubGroup, Boolean> makeUpdater(ToscaPolicy policy, + protected Updater makeUpdater(SessionData data, ToscaPolicy policy, ToscaPolicyIdentifierOptVersion desiredIdent) { // construct a matcher based on whether or not the version was specified @@ -142,6 +143,8 @@ public class PdpGroupDeleteProvider extends ProviderBase { // return a function that will remove the policy from the subgroup return (group, subgroup) -> { + Set<String> pdps = subgroup.getPdpInstances().stream().map(Pdp::getInstanceId).collect(Collectors.toSet()); + boolean result = false; Iterator<ToscaPolicyIdentifier> iter = subgroup.getPolicies().iterator(); @@ -153,6 +156,8 @@ public class PdpGroupDeleteProvider extends ProviderBase { iter.remove(); logger.info("remove policy {} {} from subgroup {} {} count={}", ident.getName(), ident.getVersion(), group.getName(), subgroup.getPdpType(), subgroup.getPolicies().size()); + + data.trackUndeploy(ident, pdps); } } diff --git a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeployProvider.java b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeployProvider.java index bc3148ef..1e00528e 100644 --- a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeployProvider.java +++ b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/PdpGroupDeployProvider.java @@ -28,8 +28,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.stream.Collectors; import javax.ws.rs.core.Response.Status; import org.onap.policy.common.parameters.BeanValidationResult; import org.onap.policy.common.parameters.ObjectValidationResult; @@ -269,8 +269,9 @@ public class PdpGroupDeployProvider extends ProviderBase { * @param dbgroup the group, as it appears within the DB * @param group the group being updated * @return {@code true} if a subgroup was removed, {@code false} otherwise + * @throws PfModelException if an error occurred */ - private boolean notifyPdpsDelSubGroups(SessionData data, PdpGroup dbgroup, PdpGroup group) { + private boolean notifyPdpsDelSubGroups(SessionData data, PdpGroup dbgroup, PdpGroup group) throws PfModelException { boolean updated = false; // subgroups, as they appear within the updated group @@ -284,6 +285,7 @@ public class PdpGroupDeployProvider extends ProviderBase { // this subgroup no longer appears - notify its PDPs updated = true; notifyPdpsDelSubGroup(data, subgrp); + trackPdpsDelSubGroup(data, subgrp); } } @@ -314,10 +316,26 @@ public class PdpGroupDeployProvider extends ProviderBase { } /** + * Tracks PDP responses when their subgroup is removed. + * + * @param data session data + * @param subgrp subgroup that is being removed + * @throws PfModelException if an error occurred + */ + private void trackPdpsDelSubGroup(SessionData data, PdpSubGroup subgrp) throws PfModelException { + Set<String> pdps = subgrp.getPdpInstances().stream().map(Pdp::getInstanceId).collect(Collectors.toSet()); + + for (ToscaPolicyIdentifier policyId : subgrp.getPolicies()) { + data.trackUndeploy(policyId, pdps); + } + } + + /** * Adds a new subgroup. * * @param data session data - * @param subgrp the subgroup to be added, updated to fully qualified versions upon return + * @param subgrp the subgroup to be added, updated to fully qualified versions upon + * return * @return the validation result * @throws PfModelException if an error occurred */ @@ -339,7 +357,8 @@ public class PdpGroupDeployProvider extends ProviderBase { * @param data session data * @param dbgroup the group, from the DB, containing the subgroup * @param dbsub the subgroup, from the DB - * @param subgrp the subgroup to be updated, updated to fully qualified versions upon return + * @param subgrp the subgroup to be updated, updated to fully qualified versions upon + * return * @param container container for additional validation results * @return {@code true} if the subgroup content was changed, {@code false} if there * were no changes @@ -356,7 +375,7 @@ public class PdpGroupDeployProvider extends ProviderBase { /* * first, apply the changes about which the PDPs care */ - boolean updated = updateList(dbsub.getPolicies(), subgrp.getPolicies(), dbsub::setPolicies); + boolean updated = updatePolicies(data, dbsub, subgrp); // publish any changes to the PDPs if (updated) { @@ -373,12 +392,40 @@ public class PdpGroupDeployProvider extends ProviderBase { dbsub::setDesiredInstanceCount) || updated; } + private boolean updatePolicies(SessionData data, PdpSubGroup dbsub, PdpSubGroup subgrp) throws PfModelException { + Set<ToscaPolicyIdentifier> undeployed = new HashSet<>(dbsub.getPolicies()); + undeployed.removeAll(subgrp.getPolicies()); + + Set<ToscaPolicyIdentifier> deployed = new HashSet<>(subgrp.getPolicies()); + deployed.removeAll(dbsub.getPolicies()); + + if (deployed.isEmpty() && undeployed.isEmpty()) { + // lists are identical + return false; + } + + + Set<String> pdps = subgrp.getPdpInstances().stream().map(Pdp::getInstanceId).collect(Collectors.toSet()); + + for (ToscaPolicyIdentifier policyId : deployed) { + data.trackDeploy(policyId, pdps); + } + + for (ToscaPolicyIdentifier policyId : undeployed) { + data.trackUndeploy(policyId, pdps); + } + + dbsub.setPolicies(new ArrayList<>(subgrp.getPolicies())); + return true; + } + /** * Performs additional validations of a subgroup. * * @param data session data * @param dbsub the subgroup, from the DB - * @param subgrp the subgroup to be validated, updated to fully qualified versions upon return + * @param subgrp the subgroup to be validated, updated to fully qualified versions + * upon return * @param container container for additional validation results * @return {@code true} if the subgroup is valid, {@code false} otherwise * @throws PfModelException if an error occurred @@ -455,7 +502,8 @@ public class PdpGroupDeployProvider extends ProviderBase { private ValidationResult validatePolicies(SessionData data, PdpSubGroup dbsub, PdpSubGroup subgrp) throws PfModelException { - // build a map of the DB data, from policy name to (fully qualified) policy version + // build a map of the DB data, from policy name to (fully qualified) policy + // version Map<String, String> dbname2vers = new HashMap<>(); if (dbsub != null) { dbsub.getPolicies().forEach(ident -> dbname2vers.put(ident.getName(), ident.getVersion())); @@ -557,7 +605,7 @@ public class PdpGroupDeployProvider extends ProviderBase { * Adds a policy to a subgroup, if it isn't there already. */ @Override - protected BiFunction<PdpGroup, PdpSubGroup, Boolean> makeUpdater(ToscaPolicy policy, + protected Updater makeUpdater(SessionData data, ToscaPolicy policy, ToscaPolicyIdentifierOptVersion requestedIdent) { ToscaPolicyIdentifier desiredIdent = policy.getIdentifier(); @@ -586,6 +634,10 @@ public class PdpGroupDeployProvider extends ProviderBase { logger.info("add policy {} {} to subgroup {} {} count={}", desiredIdent.getName(), desiredIdent.getVersion(), group.getName(), subgroup.getPdpType(), subgroup.getPolicies().size()); + + Set<String> pdps = subgroup.getPdpInstances().stream().map(Pdp::getInstanceId).collect(Collectors.toSet()); + data.trackDeploy(desiredIdent, pdps); + return true; }; } diff --git a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/ProviderBase.java b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/ProviderBase.java index 70ccd7ab..fd8edf17 100644 --- a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/ProviderBase.java +++ b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/ProviderBase.java @@ -21,17 +21,13 @@ package org.onap.policy.pap.main.rest.depundep; import java.util.Collection; -import java.util.Collections; -import java.util.function.BiFunction; import java.util.stream.Collectors; import javax.ws.rs.core.Response.Status; -import org.apache.commons.lang3.tuple.Pair; import org.onap.policy.common.utils.services.Registry; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.base.PfModelRuntimeException; import org.onap.policy.models.pdp.concepts.Pdp; import org.onap.policy.models.pdp.concepts.PdpGroup; -import org.onap.policy.models.pdp.concepts.PdpStateChange; import org.onap.policy.models.pdp.concepts.PdpSubGroup; import org.onap.policy.models.pdp.concepts.PdpUpdate; import org.onap.policy.models.provider.PolicyModelsProvider; @@ -41,6 +37,7 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyTypeIdentifi import org.onap.policy.pap.main.PapConstants; import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; import org.onap.policy.pap.main.comm.PdpModifyRequestMap; +import org.onap.policy.pap.main.notification.PolicyNotifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +67,11 @@ public abstract class ProviderBase { private final PdpModifyRequestMap requestMap; /** + * Generates policy notifications based on responses from PDPs. + */ + private final PolicyNotifier notifier; + + /** * Factory for PAP DAO. */ private final PolicyModelsProviderFactoryWrapper daoFactory; @@ -82,6 +84,7 @@ public abstract class ProviderBase { this.updateLock = Registry.get(PapConstants.REG_PDP_MODIFY_LOCK, Object.class); this.requestMap = Registry.get(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class); this.daoFactory = Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class); + this.notifier = Registry.get(PapConstants.REG_POLICY_NOTIFIER, PolicyNotifier.class); } /** @@ -94,19 +97,16 @@ public abstract class ProviderBase { protected <T> void process(T request, BiConsumerWithEx<SessionData, T> processor) throws PfModelException { synchronized (updateLock) { - // list of requests to be published to the PDPs - Collection<Pair<PdpUpdate, PdpStateChange>> requests = Collections.emptyList(); + SessionData data; try (PolicyModelsProvider dao = daoFactory.create()) { - SessionData data = new SessionData(dao); + data = new SessionData(dao); processor.accept(data, request); // make all of the DB updates data.updateDb(); - requests = data.getPdpRequests(); - } catch (PfModelException | PfModelRuntimeException e) { logger.warn(DEPLOY_FAILED, e); throw e; @@ -116,9 +116,12 @@ public abstract class ProviderBase { throw new PfModelException(Status.INTERNAL_SERVER_ERROR, "request failed", e); } + // track responses for notification purposes + data.getDeployData().forEach(notifier::addDeploymentData); + data.getUndeployData().forEach(notifier::addUndeploymentData); // publish the requests - requests.forEach(pair -> requestMap.addRequest(pair.getLeft(), pair.getRight())); + data.getPdpRequests().forEach(pair -> requestMap.addRequest(pair.getLeft(), pair.getRight())); } } @@ -140,7 +143,7 @@ public abstract class ProviderBase { + desiredPolicy.getName() + " " + desiredPolicy.getVersion()); } - BiFunction<PdpGroup, PdpSubGroup, Boolean> updater = makeUpdater(policy, desiredPolicy); + Updater updater = makeUpdater(data, policy, desiredPolicy); for (PdpGroup group : groups) { upgradeGroup(data, group, updater); @@ -152,11 +155,12 @@ public abstract class ProviderBase { * {@code true} if the subgroup was updated, {@code false} if no update was * necessary/appropriate. * + * @param data session data * @param policy policy to be added to or removed from each subgroup * @param desiredPolicy request policy * @return a function to update a subgroup */ - protected abstract BiFunction<PdpGroup, PdpSubGroup, Boolean> makeUpdater(ToscaPolicy policy, + protected abstract Updater makeUpdater(SessionData data, ToscaPolicy policy, ToscaPolicyIdentifierOptVersion desiredPolicy); /** @@ -180,9 +184,9 @@ public abstract class ProviderBase { * @param data session data * @param group the original group, to be updated * @param updater function to update a group - * @throws PfModelRuntimeException if an error occurred + * @throws PfModelException if an error occurred */ - private void upgradeGroup(SessionData data, PdpGroup group, BiFunction<PdpGroup, PdpSubGroup, Boolean> updater) { + private void upgradeGroup(SessionData data, PdpGroup group, Updater updater) throws PfModelException { boolean updated = false; @@ -275,4 +279,9 @@ public abstract class ProviderBase { */ void accept(F firstArg, S secondArg) throws PfModelException; } + + @FunctionalInterface + public static interface Updater { + boolean apply(PdpGroup group, PdpSubGroup subgroup) throws PfModelException; + } } diff --git a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/SessionData.java b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/SessionData.java index 437d7a11..5b2a8eea 100644 --- a/main/src/main/java/org/onap/policy/pap/main/rest/depundep/SessionData.java +++ b/main/src/main/java/org/onap/policy/pap/main/rest/depundep/SessionData.java @@ -23,8 +23,10 @@ package org.onap.policy.pap.main.rest.depundep; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; @@ -38,9 +40,11 @@ import org.onap.policy.models.provider.PolicyModelsProvider; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyFilter; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyFilter.ToscaPolicyFilterBuilder; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifierOptVersion; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyType; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyTypeIdentifier; +import org.onap.policy.pap.main.notification.PolicyPdpNotificationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +92,18 @@ public class SessionData { */ private final Map<ToscaPolicyTypeIdentifier, ToscaPolicyType> typeCache = new HashMap<>(); + /** + * Policies to be deployed. This is just used to build up the data, which is then + * passed to the notifier once the update is "committed". + */ + private final Map<ToscaPolicyIdentifier, PolicyPdpNotificationData> deploy = new HashMap<>(); + + /** + * Policies to be undeployed. This is just used to build up the data, which is then + * passed to the notifier once the update is "committed". + */ + private final Map<ToscaPolicyIdentifier, PolicyPdpNotificationData> undeploy = new HashMap<>(); + /** * Constructs the object. @@ -402,4 +418,90 @@ public class SessionData { logger.info("deleting DB group {}", group.getName()); dao.deletePdpGroup(group.getName()); } + + /** + * Adds policy deployment data. + * + * @param policyId ID of the policy being deployed + * @param pdps PDPs to which the policy is being deployed + * @throws PfModelException if an error occurred + */ + protected void trackDeploy(ToscaPolicyIdentifier policyId, Collection<String> pdps) throws PfModelException { + trackDeploy(policyId, new HashSet<>(pdps)); + } + + /** + * Adds policy deployment data. + * + * @param policyId ID of the policy being deployed + * @param pdps PDPs to which the policy is being deployed + * @throws PfModelException if an error occurred + */ + protected void trackDeploy(ToscaPolicyIdentifier policyId, Set<String> pdps) throws PfModelException { + addData(policyId, pdps, deploy, undeploy); + } + + /** + * Adds policy undeployment data. + * + * @param policyId ID of the policy being undeployed + * @param pdps PDPs to which the policy is being undeployed + * @throws PfModelException if an error occurred + */ + protected void trackUndeploy(ToscaPolicyIdentifier policyId, Collection<String> pdps) throws PfModelException { + trackUndeploy(policyId, new HashSet<>(pdps)); + } + + /** + * Adds policy undeployment data. + * + * @param policyId ID of the policy being undeployed + * @param pdps PDPs to which the policy is being undeployed + * @throws PfModelException if an error occurred + */ + protected void trackUndeploy(ToscaPolicyIdentifier policyId, Set<String> pdps) throws PfModelException { + addData(policyId, pdps, undeploy, deploy); + } + + /** + * Adds policy deployment/undeployment data. + * + * @param policyId ID of the policy being deployed/undeployed + * @param pdps PDPs to which the policy is being deployed/undeployed + * @param addMap map to which it should be added + * @param removeMap map from which it should be removed + * @throws PfModelException if an error occurred + */ + private void addData(ToscaPolicyIdentifier policyId, Set<String> pdps, + Map<ToscaPolicyIdentifier, PolicyPdpNotificationData> addMap, + Map<ToscaPolicyIdentifier, PolicyPdpNotificationData> removeMap) throws PfModelException { + + PolicyPdpNotificationData removeData = removeMap.get(policyId); + if (removeData != null) { + removeData.removeAll(pdps); + } + + ToscaPolicyIdentifierOptVersion optid = new ToscaPolicyIdentifierOptVersion(policyId); + ToscaPolicyTypeIdentifier policyType = getPolicy(optid).getTypeIdentifier(); + + addMap.computeIfAbsent(policyId, key -> new PolicyPdpNotificationData(policyId, policyType)).addAll(pdps); + } + + /** + * Gets the policies to be deployed. + * + * @return the policies to be deployed + */ + public Collection<PolicyPdpNotificationData> getDeployData() { + return deploy.values(); + } + + /** + * Gets the policies to be undeployed. + * + * @return the policies to be undeployed + */ + public Collection<PolicyPdpNotificationData> getUndeployData() { + return undeploy.values(); + } } diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java index d119044f..8d2fd3ea 100644 --- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java +++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java @@ -31,6 +31,8 @@ import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.common.utils.services.Registry; import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.onap.policy.models.pap.concepts.PolicyNotification; +import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.models.pdp.enums.PdpMessageType; import org.onap.policy.pap.main.PapConstants; @@ -41,6 +43,7 @@ import org.onap.policy.pap.main.comm.PdpModifyRequestMap; import org.onap.policy.pap.main.comm.PdpTracker; import org.onap.policy.pap.main.comm.Publisher; import org.onap.policy.pap.main.comm.TimerManager; +import org.onap.policy.pap.main.notification.PolicyNotifier; import org.onap.policy.pap.main.parameters.PapParameterGroup; import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; import org.onap.policy.pap.main.parameters.PdpParameters; @@ -112,13 +115,15 @@ public class PapActivator extends ServiceManagerContainer { final Object pdpUpdateLock = new Object(); final PdpParameters pdpParams = papParameterGroup.getPdpParameters(); - final AtomicReference<Publisher> pdpPub = new AtomicReference<>(); + final AtomicReference<Publisher<PdpMessage>> pdpPub = new AtomicReference<>(); + final AtomicReference<Publisher<PolicyNotification>> notifyPub = new AtomicReference<>(); final AtomicReference<TimerManager> pdpUpdTimers = new AtomicReference<>(); final AtomicReference<TimerManager> pdpStChgTimers = new AtomicReference<>(); final AtomicReference<TimerManager> heartBeatTimers = new AtomicReference<>(); final AtomicReference<PolicyModelsProviderFactoryWrapper> daoFactory = new AtomicReference<>(); final AtomicReference<PdpModifyRequestMap> requestMap = new AtomicReference<>(); final AtomicReference<RestServer> restServer = new AtomicReference<>(); + final AtomicReference<PolicyNotifier> notifier = new AtomicReference<>(); // @formatter:off addAction("PAP parameters", @@ -156,11 +161,23 @@ public class PapActivator extends ServiceManagerContainer { addAction("PDP publisher", () -> { - pdpPub.set(new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP)); + pdpPub.set(new Publisher<>(PapConstants.TOPIC_POLICY_PDP_PAP)); startThread(pdpPub.get()); }, () -> pdpPub.get().stop()); + addAction("Policy Notification publisher", + () -> { + notifyPub.set(new Publisher<>(PapConstants.TOPIC_POLICY_NOTIFICATION)); + startThread(notifyPub.get()); + notifier.set(new PolicyNotifier(notifyPub.get())); + }, + () -> notifyPub.get().stop()); + + addAction("Policy Notifier", + () -> Registry.register(PapConstants.REG_POLICY_NOTIFIER, notifier.get()), + () -> Registry.unregister(PapConstants.REG_POLICY_NOTIFIER)); + addAction("PDP heart beat timers", () -> { long maxWaitHeartBeatMs = MAX_MISSED_HEARTBEATS * pdpParams.getHeartBeatMs(); @@ -194,7 +211,8 @@ public class PapActivator extends ServiceManagerContainer { .setDaoFactory(daoFactory.get()) .setModifyLock(pdpUpdateLock) .setParams(pdpParams) - .setPublisher(pdpPub.get()) + .setPolicyNotifier(notifier.get()) + .setPdpPublisher(pdpPub.get()) .setResponseDispatcher(reqIdDispatcher) .setStateChangeTimers(pdpStChgTimers.get()) .setUpdateTimers(pdpUpdTimers.get()))); |