diff options
author | 2019-04-10 18:23:32 +0000 | |
---|---|---|
committer | 2019-04-10 18:23:32 +0000 | |
commit | 2de916125dc485d15bdebc8b9e574b855bafc368 (patch) | |
tree | 011d2a69bc14d92163ac1e8ab25f9338204e0a52 /main/src/main | |
parent | 84da2c8e05ff63163c4431284115c97b29ff1fae (diff) | |
parent | f12eed0a3097518f49731bb722a0063b52d36b2a (diff) |
Merge "Refactor request map"
Diffstat (limited to 'main/src/main')
11 files changed, 1009 insertions, 564 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java index 24443cc2..6a743a31 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java @@ -20,26 +20,43 @@ package org.onap.policy.pap.main.comm; -import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.pdp.concepts.Pdp; +import org.onap.policy.models.pdp.concepts.PdpGroup; +import org.onap.policy.models.pdp.concepts.PdpGroupFilter; +import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.models.pdp.concepts.PdpStateChange; +import org.onap.policy.models.pdp.concepts.PdpSubGroup; import org.onap.policy.models.pdp.concepts.PdpUpdate; -import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; -import org.onap.policy.pap.main.comm.msgdata.StateChangeData; -import org.onap.policy.pap.main.comm.msgdata.UpdateData; +import org.onap.policy.models.pdp.enums.PdpState; +import org.onap.policy.models.provider.PolicyModelsProvider; +import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; +import org.onap.policy.pap.main.comm.msgdata.Request; +import org.onap.policy.pap.main.comm.msgdata.RequestListener; +import org.onap.policy.pap.main.comm.msgdata.StateChangeReq; +import org.onap.policy.pap.main.comm.msgdata.UpdateReq; import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; +import org.onap.policy.pap.main.parameters.RequestParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Maps a PDP name to requests that modify PDPs. */ public class PdpModifyRequestMap { + private static final Logger logger = LoggerFactory.getLogger(PdpModifyRequestMap.class); + + private static final String UNEXPECTED_BROADCAST = "unexpected broadcast message: "; /** - * Maps a PDP name to its request data. An entry is removed once all of the requests - * within the data have been completed. + * Maps a PDP name to its outstanding requests. */ - private final Map<String, ModifyReqData> name2data = new HashMap<>(); + private final Map<String, PdpRequests> pdp2requests = new HashMap<>(); /** * PDP modification lock. @@ -52,7 +69,13 @@ public class PdpModifyRequestMap { private final PdpModifyRequestMapParams params; /** - * Constructs the data. + * Factory for PAP DAO. + */ + private final PolicyModelsProviderFactoryWrapper daoFactory; + + + /** + * Constructs the object. * * @param params configuration parameters * @@ -63,24 +86,21 @@ public class PdpModifyRequestMap { this.params = params; this.modifyLock = params.getModifyLock(); + this.daoFactory = params.getDaoFactory(); } /** - * Adds an UPDATE request to the map. - * - * @param update the UPDATE request or {@code null} - */ - public void addRequest(PdpUpdate update) { - addRequest(update, null); - } - - /** - * Adds STATE-CHANGE request to the map. + * Stops publishing requests to the given PDP. * - * @param stateChange the STATE-CHANGE request or {@code null} + * @param pdpName PDP name */ - public void addRequest(PdpStateChange stateChange) { - addRequest(null, stateChange); + public void stopPublishing(String pdpName) { + synchronized (modifyLock) { + PdpRequests requests = pdp2requests.remove(pdpName); + if (requests != null) { + requests.stopPublishing(); + } + } } /** @@ -90,288 +110,255 @@ public class PdpModifyRequestMap { * @param stateChange the STATE-CHANGE request or {@code null} */ public void addRequest(PdpUpdate update, PdpStateChange stateChange) { - if (update == null && stateChange == null) { - return; - } + if (update == null) { + addRequest(stateChange); - synchronized (modifyLock) { - String pdpName = getPdpName(update, stateChange); - - ModifyReqData data = name2data.get(pdpName); - if (data != null) { - // update the existing request - data.add(update); - data.add(stateChange); - - } else { - data = makeRequestData(update, stateChange); - name2data.put(pdpName, data); - data.startPublishing(); + } else { + synchronized (modifyLock) { + addRequest(update); + addRequest(stateChange); } } } /** - * Gets the PDP name from two requests. + * Adds an UPDATE request to the map. * - * @param update the update request, or {@code null} - * @param stateChange the state-change request, or {@code null} - * @return the PDP name, or {@code null} if both requests are {@code null} + * @param update the UPDATE request or {@code null} */ - private static String getPdpName(PdpUpdate update, PdpStateChange stateChange) { - String pdpName; + public void addRequest(PdpUpdate update) { + if (update == null) { + return; + } - if (update != null) { - if ((pdpName = update.getName()) == null) { - throw new IllegalArgumentException("missing name in " + update); - } + if (isBroadcast(update)) { + throw new IllegalArgumentException(UNEXPECTED_BROADCAST + update); + } - if (stateChange != null && !pdpName.equals(stateChange.getName())) { - throw new IllegalArgumentException( - "name " + stateChange.getName() + " does not match " + pdpName + " " + stateChange); - } + // @formatter:off + RequestParams reqparams = new RequestParams() + .setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount()) + .setTimers(params.getUpdateTimers()) + .setModifyLock(params.getModifyLock()) + .setPublisher(params.getPublisher()) + .setResponseDispatcher(params.getResponseDispatcher()); + // @formatter:on - } else { - if ((pdpName = stateChange.getName()) == null) { - throw new IllegalArgumentException("missing name in " + stateChange); - } - } + String name = update.getName() + " " + PdpUpdate.class.getSimpleName(); + UpdateReq request = new UpdateReq(reqparams, name, update); - return pdpName; + addSingleton(request); } /** - * Determines if two requests are the "same", which is does not necessarily mean - * "equals". + * Adds a STATE-CHANGE request to the map. * - * @param first first request to check - * @param second second request to check - * @return {@code true} if the requests are the "same", {@code false} otherwise + * @param stateChange the STATE-CHANGE request or {@code null} */ - protected static boolean isSame(PdpUpdate first, PdpUpdate second) { - if (first.getPolicies().size() != second.getPolicies().size()) { - return false; + public void addRequest(PdpStateChange stateChange) { + if (stateChange == null) { + return; } - if (!first.getPdpGroup().equals(second.getPdpGroup())) { - return false; + if (isBroadcast(stateChange)) { + throw new IllegalArgumentException(UNEXPECTED_BROADCAST + stateChange); } - if (!first.getPdpSubgroup().equals(second.getPdpSubgroup())) { - return false; - } + // @formatter:off + RequestParams reqparams = new RequestParams() + .setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount()) + .setTimers(params.getStateChangeTimers()) + .setModifyLock(params.getModifyLock()) + .setPublisher(params.getPublisher()) + .setResponseDispatcher(params.getResponseDispatcher()); + // @formatter:on - // see if the other has any policies that this does not have - ArrayList<ToscaPolicy> lst = new ArrayList<>(second.getPolicies()); - lst.removeAll(first.getPolicies()); + String name = stateChange.getName() + " " + PdpStateChange.class.getSimpleName(); + StateChangeReq request = new StateChangeReq(reqparams, name, stateChange); - return lst.isEmpty(); + addSingleton(request); } /** - * Determines if two requests are the "same", which is does not necessarily mean - * "equals". + * Determines if a message is a broadcast message. * - * @param first first request to check - * @param second second request to check - * @return {@code true} if this update subsumes the other, {@code false} otherwise + * @param message the message to examine + * @return {@code true} if the message is a broadcast message, {@code false} if + * destined for a single PDP */ - protected static boolean isSame(PdpStateChange first, PdpStateChange second) { - return (first.getState() == second.getState()); + private boolean isBroadcast(PdpMessage message) { + return (message.getName() == null); } /** - * Request data, which contains an UPDATE or a STATE-CHANGE request, or both. The - * UPDATE is always published before the STATE-CHANGE. In addition, both requests may - * be changed at any point, possibly triggering a restart of the publishing. + * Configures and adds a request to the map. + * + * @param request the request to be added */ - public class ModifyReqData extends RequestData { - - /** - * The UPDATE message to be published, or {@code null}. - */ - private PdpUpdate update; - - /** - * The STATE-CHANGE message to be published, or {@code null}. - */ - private PdpStateChange stateChange; - - - /** - * Constructs the object. - * - * @param newUpdate the UPDATE message to be sent, or {@code null} - * @param newState the STATE-CHANGE message to be sent, or {@code null} - */ - public ModifyReqData(PdpUpdate newUpdate, PdpStateChange newState) { - super(params); - - if (newUpdate != null) { - this.stateChange = newState; - setName(newUpdate.getName()); - update = newUpdate; - configure(new ModUpdateData(newUpdate)); - - } else { - this.update = null; - setName(newState.getName()); - stateChange = newState; - configure(new ModStateChangeData(newState)); - } - } + private void addSingleton(Request request) { - /** - * Determines if this request is still in the map. - */ - @Override - protected boolean isActive() { - return (name2data.get(getName()) == this); + synchronized (modifyLock) { + PdpRequests requests = pdp2requests.computeIfAbsent(request.getMessage().getName(), this::makePdpRequests); + + request.setListener(new SingletonListener(requests, request)); + requests.addSingleton(request); } + } - /** - * Removes this request from the map. - */ - @Override - protected void allCompleted() { - name2data.remove(getName(), this); + /** + * Starts the next request associated with a PDP. + * + * @param requests current set of requests + * @param request the request that just completed + */ + private void startNextRequest(PdpRequests requests, Request request) { + if (!requests.startNextRequest(request)) { + pdp2requests.remove(requests.getPdpName(), requests); } + } - /** - * Adds an UPDATE to the request data, replacing any existing UPDATE, if - * appropriate. If the UPDATE is replaced, then publishing is restarted. - * - * @param newRequest the new UPDATE request - */ - private void add(PdpUpdate newRequest) { - if (newRequest == null) { - return; - } + /** + * Disables a PDP by removing it from its subgroup and then sending it a PASSIVE + * request. + * + * @param requests the requests associated with the PDP to be disabled + */ + private void disablePdp(PdpRequests requests) { - synchronized (modifyLock) { - if (update != null && isSame(update, newRequest)) { - // already have this update - discard it - return; - } + // remove the requests from the map + if (!pdp2requests.remove(requests.getPdpName(), requests)) { + // don't have the info we need to disable it + logger.warn("no requests with which to disable {}", requests.getPdpName()); + return; + } - // must restart from scratch - stopPublishing(); + logger.warn("disabling {}", requests.getPdpName()); - update = newRequest; - configure(new ModUpdateData(newRequest)); + requests.stopPublishing(); - startPublishing(); - } + // don't do anything if we don't have a group + String name = requests.getLastGroupName(); + if (name == null) { + logger.warn("no group with which to disable {}", requests.getPdpName()); + return; } - /** - * Adds a STATE-CHANGE to the request data, replacing any existing UPDATE, if - * appropriate. If the STATE-CHANGE is replaced, and we're currently publishing - * the STATE-CHANGE, then publishing is restarted. - * - * @param newRequest the new STATE-CHANGE request - */ - private void add(PdpStateChange newRequest) { - if (newRequest == null) { + // remove the PDP from the group + removeFromGroup(requests.getPdpName(), name); + + // send the state change + PdpStateChange change = new PdpStateChange(); + change.setName(requests.getPdpName()); + change.setState(PdpState.PASSIVE); + addRequest(change); + } + + /** + * Removes a PDP from its group. + * + * @param pdpName name of the PDP to be removed + * @param groupName name of the group from which it should be removed + */ + private void removeFromGroup(String pdpName, String groupName) { + + try (PolicyModelsProvider dao = daoFactory.create()) { + + PdpGroupFilter filter = PdpGroupFilter.builder().name(groupName).groupState(PdpState.ACTIVE) + .version(PdpGroupFilter.LATEST_VERSION).build(); + + List<PdpGroup> groups = dao.getFilteredPdpGroups(filter); + if (groups.isEmpty()) { return; } - synchronized (modifyLock) { - if (stateChange != null && isSame(stateChange, newRequest)) { - // already have this update - discard it + PdpGroup group = groups.get(0); + + for (PdpSubGroup subgrp : group.getPdpSubgroups()) { + if (removeFromSubgroup(pdpName, group, subgrp)) { + dao.updatePdpGroups(Collections.singletonList(group)); return; } + } - if (getWrapper() instanceof StateChangeData) { - // we were publishing STATE-CHANGE, thus must restart it - stopPublishing(); + } catch (PfModelException e) { + logger.info("unable to remove PDP {} from subgroup", pdpName, e); + } + } - stateChange = newRequest; - configure(new ModStateChangeData(newRequest)); + /** + * Removes a PDP from a subgroup. + * + * @param pdpName name of the PDP to be removed + * @param group group from which to attempt to remove the PDP + * @param subgrp subgroup from which to attempt to remove the PDP + * @return {@code true} if the PDP was removed, {@code false} if the PDP was not in + * the group + * @throws PfModelException if a DB error occurs + */ + private boolean removeFromSubgroup(String pdpName, PdpGroup group, PdpSubGroup subgrp) throws PfModelException { - startPublishing(); + Iterator<Pdp> iter = subgrp.getPdpInstances().iterator(); - } else { - // haven't started publishing STATE-CHANGE yet, just replace it - stateChange = newRequest; - } + while (iter.hasNext()) { + Pdp instance = iter.next(); + + if (pdpName.equals(instance.getInstanceId())) { + logger.info("removed {} from group={} version={} subgroup={}", pdpName, group.getName(), + group.getVersion(), subgrp.getPdpType()); + iter.remove(); + subgrp.setCurrentInstanceCount(subgrp.getPdpInstances().size()); + return true; } } - /** - * Indicates that the retry count was exhausted. - */ - protected void retryCountExhausted() { - // remove this request data from the PDP request map - allCompleted(); + return false; + } - // TODO what to do? - } + /** + * Creates a new set of requests for a PDP. May be overridden by junit tests. + * + * @param pdpName PDP name + * @return a new set of requests + */ + protected PdpRequests makePdpRequests(String pdpName) { + return new PdpRequests(pdpName); + } - /** - * Indicates that a response did not match the data. - * - * @param reason the reason for the mismatch - */ - protected void mismatch(String reason) { - // remove this request data from the PDP request map - allCompleted(); + /** + * Listener for singleton request events. + */ + private class SingletonListener implements RequestListener { + private final PdpRequests requests; + private final Request request; - // TODO what to do? + public SingletonListener(PdpRequests requests, Request request) { + this.requests = requests; + this.request = request; } - /** - * Wraps an UPDATE. - */ - private class ModUpdateData extends UpdateData { - - public ModUpdateData(PdpUpdate message) { - super(message, params); - } - - @Override - public void mismatch(String reason) { - ModifyReqData.this.mismatch(reason); + @Override + public void failure(String pdpName, String reason) { + if (requests.getPdpName().equals(pdpName)) { + disablePdp(requests); } + } - @Override - public void completed() { - if (stateChange == null) { - // no STATE-CHANGE request - we're done - allCompleted(); + @Override + public void success(String pdpName) { + if (requests.getPdpName().equals(pdpName)) { + if (pdp2requests.get(requests.getPdpName()) == requests) { + startNextRequest(requests, request); } else { - // now process the STATE-CHANGE request - configure(new ModStateChangeData(stateChange)); - startPublishing(); + logger.info("discard old requests for {}", pdpName); + requests.stopPublishing(); } } } - /** - * Wraps a STATE-CHANGE. - */ - private class ModStateChangeData extends StateChangeData { - - public ModStateChangeData(PdpStateChange message) { - super(message, params); - } - - @Override - public void mismatch(String reason) { - ModifyReqData.this.mismatch(reason); - } - - @Override - public void completed() { - allCompleted(); - } + @Override + public void retryCountExhausted() { + disablePdp(requests); } } - - // these may be overridden by junit tests - - protected ModifyReqData makeRequestData(PdpUpdate update, PdpStateChange stateChange) { - return new ModifyReqData(update, stateChange); - } } diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java new file mode 100644 index 00000000..9fbf36d4 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java @@ -0,0 +1,271 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import lombok.Getter; +import org.onap.policy.models.pdp.concepts.PdpMessage; +import org.onap.policy.models.pdp.concepts.PdpUpdate; +import org.onap.policy.pap.main.comm.msgdata.Request; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tracks requests associated with a particular PDP. Requests may be broadcast requests or + * singleton requests (i.e., destined for a single PDP). + */ +public class PdpRequests { + private static final Logger logger = LoggerFactory.getLogger(PdpRequests.class); + + /** + * The maximum request priority + 1. + */ + private static final int MAX_PRIORITY = 2; + + /** + * Name of the PDP with which the requests are associated. + */ + @Getter + private final String pdpName; + + /** + * Index of request currently being published. + */ + private int curIndex = 0; + + /** + * Singleton requests. Items may be {@code null}. + */ + private Request[] singletons = new Request[MAX_PRIORITY]; + + /** + * Last group name to which the associated PDP was assigned. + */ + @Getter + private String lastGroupName; + + + /** + * Constructs the object. + * + * @param pdpName name of the PDP with which the requests are associated + */ + public PdpRequests(String pdpName) { + this.pdpName = pdpName; + } + + /** + * Records the group information from the request. + * + * @param request the request from which to extract the group information + */ + private void recordGroup(Request request) { + PdpMessage message = request.getMessage(); + if (message instanceof PdpUpdate) { + lastGroupName = message.getPdpGroup(); + } + } + + /** + * Adds a singleton request. + * + * @param request the request to be added + */ + public void addSingleton(Request request) { + + if (request.getMessage().getName() == null) { + throw new IllegalArgumentException("unexpected broadcast for " + pdpName); + } + + recordGroup(request); + + if (checkExisting(request)) { + // have an existing request that's similar - discard this request + return; + } + + // no existing request of this type + + int priority = request.getPriority(); + singletons[priority] = request; + + // stop publishing anything of a lower priority + QueueToken<PdpMessage> token = stopPublishingLowerPriority(priority); + + // start publishing if nothing of higher priority + if (higherPrioritySingleton(priority)) { + logger.info("{} not publishing due to priority higher than {}", pdpName, priority); + return; + } + + curIndex = priority; + request.startPublishing(token); + } + + /** + * Checks for an existing request. + * + * @param request the request of interest + * @return {@code true} if a similar request already exists, {@code false} otherwise + */ + private boolean checkExisting(Request request) { + + return checkExistingSingleton(request); + } + + /** + * Checks for an existing singleton request. + * + * @param request the request of interest + * @return {@code true} if a similar singleton request already exists, {@code false} + * otherwise + */ + private boolean checkExistingSingleton(Request request) { + + Request exsingle = singletons[request.getPriority()]; + + if (exsingle == null) { + return false; + } + + if (exsingle.isSameContent(request)) { + // unchanged from existing request + logger.info("{} message content unchanged for {}", pdpName, exsingle.getClass().getSimpleName()); + return true; + } + + // reconfigure the existing request + PdpMessage message = request.getMessage(); + exsingle.reconfigure(message, null); + + // still have a singleton in the queue for this request + return true; + } + + /** + * Stops all publishing and removes this PDP from any broadcast messages. + */ + public void stopPublishing() { + // stop singletons + for (int x = 0; x < MAX_PRIORITY; ++x) { + Request single = singletons[x]; + + if (single != null) { + singletons[x] = null; + single.stopPublishing(); + } + } + } + + /** + * Stops publishing requests of a lower priority than the specified priority. + * + * @param priority priority of interest + * @return the token that was being used to publish a lower priority request + */ + private QueueToken<PdpMessage> stopPublishingLowerPriority(int priority) { + + // stop singletons + for (int x = 0; x < priority; ++x) { + Request single = singletons[x]; + + if (single != null) { + logger.info("{} stop publishing priority {}", pdpName, single.getPriority()); + + QueueToken<PdpMessage> token = single.stopPublishing(false); + if (token != null) { + // found one that was publishing + return token; + } + } + } + + return null; + } + + /** + * Starts publishing the next request in the queue. + * + * @param request the request that just completed + * @return {@code true} if there is another request in the queue, {@code false} if all + * requests for this PDP have been processed + */ + public boolean startNextRequest(Request request) { + if (!zapRequest(curIndex, request)) { + // not at curIndex - look for it in other indices + for (int x = 0; x < MAX_PRIORITY; ++x) { + if (zapRequest(x, request)) { + break; + } + } + } + + // find/start the highest priority request + for (curIndex = MAX_PRIORITY - 1; curIndex >= 0; --curIndex) { + + if (singletons[curIndex] != null) { + logger.info("{} start publishing priority {}", pdpName, curIndex); + + singletons[curIndex].startPublishing(); + return true; + } + } + + logger.info("{} has no more requests", pdpName); + curIndex = 0; + + return false; + } + + /** + * Zaps request pointers, if the request appears at the given index. + * + * @param index index to examine + * @param request request of interest + * @return {@code true} if a request pointer was zapped, {@code false} if the request + * did not appear at the given index + */ + private boolean zapRequest(int index, Request request) { + if (singletons[index] == request) { + singletons[index] = null; + return true; + } + + return false; + } + + /** + * Determines if any singleton request, with a higher priority, is associated with the + * PDP. + * + * @param priority priority of interest + * + * @return {@code true} if the PDP has a singleton, {@code false} otherwise + */ + private boolean higherPrioritySingleton(int priority) { + for (int x = priority + 1; x < MAX_PRIORITY; ++x) { + if (singletons[x] != null) { + return true; + } + } + + return false; + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java deleted file mode 100644 index aa288f7d..00000000 --- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP PAP - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.pap.main.comm.msgdata; - -import org.onap.policy.models.pdp.concepts.PdpMessage; -import org.onap.policy.models.pdp.concepts.PdpStatus; -import org.onap.policy.pap.main.comm.TimerManager; - - -/** - * Wraps a message, providing methods appropriate to the message type. - */ -public abstract class MessageData { - private final PdpMessage message; - private final int maxRetries; - private final TimerManager timers; - - /** - * Constructs the object. - * - * @param message message to be wrapped by this - * @param maxRetries max number of retries - * @param timers the timer manager for messages of this type - */ - public MessageData(PdpMessage message, int maxRetries, TimerManager timers) { - this.message = message; - this.maxRetries = maxRetries; - this.timers = timers; - } - - /** - * Gets the wrapped message. - * - * @return the wrapped message - */ - public PdpMessage getMessage() { - return message; - } - - /** - * Gets a string, suitable for logging, identifying the message type. - * - * @return the message type - */ - public String getType() { - return message.getClass().getSimpleName(); - } - - /** - * Gets the maximum retry count for the particular message type. - * - * @return the maximum retry count - */ - public int getMaxRetryCount() { - return maxRetries; - } - - /** - * Gets the timer manager for the particular message type. - * - * @return the timer manager - */ - public TimerManager getTimers() { - return timers; - } - - /** - * Indicates that the response did not match what was expected. - * - * @param reason the reason for the mismatch - */ - public abstract void mismatch(String reason); - - /** - * Indicates that processing of this particular message has completed successfully. - */ - public abstract void completed(); - - /** - * Checks the response to ensure it is as expected. - * - * @param response the response to check - * @return an error message, if a fatal error has occurred, {@code null} otherwise - */ - public abstract String checkResponse(PdpStatus response); -} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java new file mode 100644 index 00000000..1f69dcb5 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java @@ -0,0 +1,124 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm.msgdata; + +import org.onap.policy.models.pdp.concepts.PdpMessage; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.pap.main.comm.QueueToken; + +/** + * Request data, whose message may be changed at any point, possibly triggering a restart + * of the publishing. + */ +public interface Request { + + /** + * Gets the request priority. Higher priority requests are published before lower + * priority requests. + * + * @return the request priority + */ + public int getPriority(); + + /** + * Gets the name with which this data is associated, used for logging purposes. This + * may be changed when this is reconfigured. + * + * @return the name with which this data is associated + */ + public String getName(); + + /** + * Gets the current message. + * + * @return the current message + */ + public PdpMessage getMessage(); + + /** + * Sets the listener that will receive request events. + * + * @param listener the request listener + */ + public void setListener(RequestListener listener); + + /** + * Determines if this request is currently being published. + * + * @return {@code true} if this request is being published, {@code false} otherwise + */ + public boolean isPublishing(); + + /** + * Starts the publishing process, registering any listeners or timeout handlers, and + * adding the request to the publisher queue. + */ + public void startPublishing(); + + /** + * Starts the publishing process. + * + * @param token2 token that can be used when publishing, or {@code null} to allocate a + * new token + */ + public void startPublishing(QueueToken<PdpMessage> token2); + + /** + * Unregisters the listener, cancels the timer, and removes the message from the + * queue. + */ + public void stopPublishing(); + + /** + * Unregisters the listener and cancels the timer. + * + * @param removeFromQueue {@code true} if the message should be removed from the + * queue, {@code false} otherwise + * @return the token that was being used to publish the message, or {@code null} if + * the request was not being published + */ + public QueueToken<PdpMessage> stopPublishing(boolean removeFromQueue); + + /** + * Reconfigures the fields based on the {@link #message} type. Suspends publishing, + * updates the configuration, and then resumes publishing. + * + * @param newMessage the new message + * @param token2 token to use when publishing, or {@code null} to allocate a new token + */ + public void reconfigure(PdpMessage newMessage, QueueToken<PdpMessage> token2); + + /** + * Checks the response to ensure it is as expected. + * + * @param response the response to check + * @return an error message, if a fatal error has occurred, {@code null} otherwise + */ + public String checkResponse(PdpStatus response); + + /** + * Determines if this request has the same content as another request. + * + * @param other request against which to compare + * @return {@code true} if the requests have the same content, {@code false} otherwise + */ + public boolean isSameContent(Request other); +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java index 29ad85bc..45ca2db4 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.policy.pap.main.comm; +package org.onap.policy.pap.main.comm.msgdata; import lombok.AccessLevel; import lombok.Getter; @@ -28,54 +28,60 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.utils.services.ServiceManager; import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.models.pdp.concepts.PdpStatus; -import org.onap.policy.pap.main.comm.msgdata.MessageData; -import org.onap.policy.pap.main.parameters.RequestDataParams; +import org.onap.policy.pap.main.comm.QueueToken; +import org.onap.policy.pap.main.comm.TimerManager; +import org.onap.policy.pap.main.parameters.RequestParams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Request data, the content of which may be changed at any point, possibly triggering a - * restart of the publishing. + * Request data implementation. */ -public abstract class RequestData { - private static final Logger logger = LoggerFactory.getLogger(RequestData.class); +public abstract class RequestImpl implements Request { + private static final Logger logger = LoggerFactory.getLogger(RequestImpl.class); /** * Name with which this data is associated, used for logging purposes. */ @Getter - @Setter(AccessLevel.PROTECTED) - private String name; + private final String name; /** * The configuration parameters. */ - private final RequestDataParams params; + @Getter(AccessLevel.PROTECTED) + private final RequestParams params; /** - * Current retry count. + * Used to register/unregister the listener and the timer. */ - private int retryCount = 0; + private final ServiceManager svcmgr; /** - * Used to register/unregister the listener and the timer. + * Handles events associated with the request. + */ + @Setter + private RequestListener listener; + + /** + * Current retry count. */ - private ServiceManager svcmgr; + @Getter + private int retryCount = 0; /** - * Wrapper for the message that is currently being published (i.e., {@link #update} or - * {@link #stateChange}. + * The current message. */ - @Getter(AccessLevel.PROTECTED) - private MessageData wrapper; + @Getter + private PdpMessage message; /** - * Used to cancel a timer. + * The currently running timer. */ private TimerManager.Timer timer; /** - * Token that is placed on the queue. + * Token that has been placed on the queue. */ private QueueToken<PdpMessage> token = null; @@ -84,72 +90,119 @@ public abstract class RequestData { * Constructs the object, and validates the parameters. * * @param params configuration parameters + * @param name the request name, used for logging purposes + * @param message the initial message * * @throws IllegalArgumentException if a required parameter is not set */ - public RequestData(@NonNull RequestDataParams params) { + public RequestImpl(@NonNull RequestParams params, @NonNull String name, @NonNull PdpMessage message) { params.validate(); + this.name = name; this.params = params; + this.message = message; + + // @formatter:off + this.svcmgr = new ServiceManager(name) + .addAction("listener", + () -> params.getResponseDispatcher() + .register(this.message.getRequestId(), this::processResponse), + () -> params.getResponseDispatcher().unregister(this.message.getRequestId())) + .addAction("timer", + () -> timer = params.getTimers().register(this.message.getRequestId(), this::handleTimeout), + () -> timer.cancel()) + .addAction("enqueue", + () -> enqueue(), + () -> { + // do not remove from the queue - token may be re-used + }); + // @formatter:on } - /** - * Starts the publishing process, registering any listeners or timeout handlers, and - * adding the request to the publisher queue. This should not be invoked until after - * {@link #configure(MessageData)} is invoked. - */ + @Override + public void reconfigure(PdpMessage newMessage, QueueToken<PdpMessage> token2) { + if (newMessage.getClass() != message.getClass()) { + throw new IllegalArgumentException("expecting " + message.getClass().getSimpleName() + " instead of " + + newMessage.getClass().getSimpleName()); + } + + logger.info("reconfiguring {} with new message", getName()); + + if (svcmgr.isAlive()) { + token = stopPublishing(false); + message = newMessage; + startPublishing(token2); + + } else { + message = newMessage; + } + } + + @Override + public boolean isPublishing() { + return svcmgr.isAlive(); + } + + @Override public void startPublishing() { + startPublishing(null); + } + + @Override + public void startPublishing(QueueToken<PdpMessage> token2) { + if (listener == null) { + throw new IllegalStateException("listener has not been set"); + } synchronized (params.getModifyLock()) { - if (!svcmgr.isAlive()) { + replaceToken(token2); + + if (svcmgr.isAlive()) { + logger.info("{} is already publishing", getName()); + + } else { + resetRetryCount(); svcmgr.start(); } } } /** - * Unregisters the listener and cancels the timer. + * Replaces the current token with a new token. + * @param newToken the new token */ - protected void stopPublishing() { - if (svcmgr.isAlive()) { - svcmgr.stop(); + private void replaceToken(QueueToken<PdpMessage> newToken) { + if (newToken != null) { + if (token == null) { + token = newToken; + + } else if (token != newToken) { + // already have a token - discard the new token + newToken.replaceItem(null); + } } } - /** - * Configures the fields based on the {@link #message} type. - * - * @param newWrapper the new message wrapper - */ - protected void configure(MessageData newWrapper) { - - wrapper = newWrapper; + @Override + public void stopPublishing() { + stopPublishing(true); + } - resetRetryCount(); + @Override + public QueueToken<PdpMessage> stopPublishing(boolean removeFromQueue) { + if (svcmgr.isAlive()) { + svcmgr.stop(); - TimerManager timerManager = wrapper.getTimers(); - String msgType = wrapper.getType(); - String reqid = wrapper.getMessage().getRequestId(); + if (removeFromQueue) { + token.replaceItem(null); + token = null; + } + } - /* - * We have to configure the service manager HERE, because it's name changes if the - * message class changes. - */ + QueueToken<PdpMessage> tok = token; + token = null; - // @formatter:off - this.svcmgr = new ServiceManager(name + " " + msgType) - .addAction("listener", - () -> params.getResponseDispatcher().register(reqid, this::processResponse), - () -> params.getResponseDispatcher().unregister(reqid)) - .addAction("timer", - () -> timer = timerManager.register(name, this::handleTimeout), - () -> timer.cancel()) - .addAction("enqueue", - () -> enqueue(), - () -> { - // nothing to "stop" - }); - // @formatter:on + return tok; } /** @@ -157,7 +210,6 @@ public abstract class RequestData { * if possible. Otherwise, it adds a new token to the queue. */ private void enqueue() { - PdpMessage message = wrapper.getMessage(); if (token != null && token.replaceItem(message) != null) { // took the other's place in the queue - continue using the token return; @@ -171,7 +223,7 @@ public abstract class RequestData { /** * Resets the retry count. */ - protected void resetRetryCount() { + public void resetRetryCount() { retryCount = 0; } @@ -180,8 +232,8 @@ public abstract class RequestData { * * @return {@code true} if successful, {@code false} if the limit has been reached */ - protected boolean bumpRetryCount() { - if (retryCount >= wrapper.getMaxRetryCount()) { + public boolean bumpRetryCount() { + if (retryCount >= params.getMaxRetryCount()) { return false; } @@ -190,15 +242,6 @@ public abstract class RequestData { } /** - * Indicates that the retry count was exhausted. The default method simply invokes - * {@link #allCompleted()}. - */ - protected void retryCountExhausted() { - // remove this request data from the PDP request map - allCompleted(); - } - - /** * Processes a response received from the PDP. * * @param infra infrastructure on which the response was received @@ -208,26 +251,22 @@ public abstract class RequestData { private void processResponse(CommInfrastructure infra, String topic, PdpStatus response) { synchronized (params.getModifyLock()) { + String pdpName = response.getName(); + if (!svcmgr.isAlive()) { // this particular request must have been discarded return; } - stopPublishing(); - - if (!isActive()) { - return; - } - - String reason = wrapper.checkResponse(response); + String reason = checkResponse(response); if (reason != null) { logger.info("{} PDP data mismatch: {}", getName(), reason); - wrapper.mismatch(reason); - - } else { - logger.info("{} {} successful", getName(), wrapper.getType()); - wrapper.completed(); + listener.failure(pdpName, reason); + return; } + + logger.info("{} successful", getName()); + listener.success(pdpName); } } @@ -246,51 +285,36 @@ public abstract class RequestData { stopPublishing(); - if (!isActive()) { - return; - } - - if (isInQueue()) { - // haven't published yet - just leave it in the queue and reset counts - logger.info("{} timeout - request still in the queue", getName()); - resetRetryCount(); - startPublishing(); - return; - } - if (!bumpRetryCount()) { - logger.info("{} timeout - retry count exhausted", getName()); - retryCountExhausted(); + logger.info("{} timeout - retry count {} exhausted", getName(), retryCount); + listener.retryCountExhausted(); return; } // re-publish - logger.info("{} timeout - re-publish", getName()); + logger.info("{} timeout - re-publish count {}", getName(), retryCount); + + // startPublishing() resets the count, so save & restore it here + int count = retryCount; startPublishing(); + retryCount = count; } } /** - * Determines if the current message is still in the queue. Assumes that - * {@link #startPublishing()} has been invoked and thus {@link #token} has been - * initialized. - * - * @return {@code true} if the current message is in the queue, {@code false} - * otherwise + * Verifies that the name is not null. Also verifies that it matches the name in the + * message, if the message has a name. */ - private boolean isInQueue() { - return (token.get() == wrapper.getMessage()); - } + @Override + public String checkResponse(PdpStatus response) { + if (response.getName() == null) { + return "null PDP name"; + } - /** - * Determines if this request data is still active. - * - * @return {@code true} if this request is active, {@code false} otherwise - */ - protected abstract boolean isActive(); + if (message.getName() != null && !message.getName().equals(response.getName())) { + return "PDP name does not match"; + } - /** - * Indicates that this entire request has completed. - */ - protected abstract void allCompleted(); + return null; + } } diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestListener.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestListener.java new file mode 100644 index 00000000..4c53bd64 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestListener.java @@ -0,0 +1,47 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm.msgdata; + +/** + * Listener for request events. + */ +public interface RequestListener { + + /** + * Indicates that an invalid response was received from a PDP. + * + * @param pdpName name of the PDP from which the response was received + * @param reason the reason for the mismatch + */ + public void failure(String pdpName, String reason); + + /** + * Indicates that a successful response was received from a PDP. + * + * @param pdpName name of the PDP from which the response was received + */ + public void success(String pdpName); + + /** + * Indicates that the retry count was exhausted. + */ + public void retryCountExhausted(); +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java index ecbf5dfa..e38cb0fd 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java @@ -20,38 +20,61 @@ package org.onap.policy.pap.main.comm.msgdata; +import org.onap.policy.models.pdp.concepts.PdpMessage; import org.onap.policy.models.pdp.concepts.PdpStateChange; import org.onap.policy.models.pdp.concepts.PdpStatus; -import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; +import org.onap.policy.pap.main.parameters.RequestParams; /** * Wraps a STATE-CHANGE. */ -public abstract class StateChangeData extends MessageData { - private PdpStateChange stateChange; +public class StateChangeReq extends RequestImpl { /** - * Constructs the object. + * Constructs the object, and validates the parameters. * - * @param message message to be wrapped by this - * @param params the parameters + * @param params configuration parameters + * @param name the request name, used for logging purposes + * @param message the initial message + * + * @throws IllegalArgumentException if a required parameter is not set */ - public StateChangeData(PdpStateChange message, PdpModifyRequestMapParams params) { - super(message, params.getParams().getStateChangeParameters().getMaxRetryCount(), params.getStateChangeTimers()); + public StateChangeReq(RequestParams params, String name, PdpMessage message) { + super(params, name, message); + } - stateChange = message; + @Override + public PdpStateChange getMessage() { + return (PdpStateChange) super.getMessage(); } @Override public String checkResponse(PdpStatus response) { - if (!stateChange.getName().equals(response.getName())) { - return "name does not match"; + String reason = super.checkResponse(response); + if (reason != null) { + return reason; } - if (response.getState() != stateChange.getState()) { - return "state is " + response.getState() + ", but expected " + stateChange.getState(); + PdpStateChange message = getMessage(); + if (response.getState() != message.getState()) { + return "state is " + response.getState() + ", but expected " + message.getState(); } return null; } + + @Override + public boolean isSameContent(Request other) { + if (!(other instanceof StateChangeReq)) { + return false; + } + + PdpStateChange message = (PdpStateChange) other.getMessage(); + return (getMessage().getState() == message.getState()); + } + + @Override + public int getPriority() { + return 0; + } } diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java deleted file mode 100644 index eca0b384..00000000 --- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP PAP - * ================================================================================ - * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.pap.main.comm.msgdata; - -import java.util.HashSet; -import java.util.Set; -import org.onap.policy.models.pdp.concepts.PdpStatus; -import org.onap.policy.models.pdp.concepts.PdpUpdate; -import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; -import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; -import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; - - -/** - * Wraps an UPDATE. - */ -public abstract class UpdateData extends MessageData { - private PdpUpdate update; - - /** - * Constructs the object. - * - * @param message message to be wrapped by this - * @param params the parameters - */ - public UpdateData(PdpUpdate message, PdpModifyRequestMapParams params) { - super(message, params.getParams().getUpdateParameters().getMaxRetryCount(), params.getUpdateTimers()); - - update = message; - } - - @Override - public String checkResponse(PdpStatus response) { - if (!update.getName().equals(response.getName())) { - return "name does not match"; - } - - if (!update.getPdpGroup().equals(response.getPdpGroup())) { - return "group does not match"; - } - - if (!update.getPdpSubgroup().equals(response.getPdpSubgroup())) { - return "subgroup does not match"; - } - - // see if the other has any policies that this does not have - Set<ToscaPolicyIdentifier> set = new HashSet<>(response.getPolicies()); - - for (ToscaPolicy policy : update.getPolicies()) { - set.remove(policy.getIdentifier()); - } - - if (!set.isEmpty()) { - return "policies do not match"; - } - - return null; - } -} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java new file mode 100644 index 00000000..0f6d73fc --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java @@ -0,0 +1,114 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm.msgdata; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.onap.policy.models.pdp.concepts.PdpMessage; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpUpdate; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; +import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; +import org.onap.policy.pap.main.parameters.RequestParams; + + +/** + * Wraps an UPDATE. + */ +public class UpdateReq extends RequestImpl { + + /** + * Constructs the object, and validates the parameters. + * + * @param params configuration parameters + * @param name the request name, used for logging purposes + * @param message the initial message + * + * @throws IllegalArgumentException if a required parameter is not set + */ + public UpdateReq(RequestParams params, String name, PdpMessage message) { + super(params, name, message); + } + + @Override + public PdpUpdate getMessage() { + return (PdpUpdate) super.getMessage(); + } + + @Override + public String checkResponse(PdpStatus response) { + String reason = super.checkResponse(response); + if (reason != null) { + return reason; + } + + PdpUpdate message = (PdpUpdate) getMessage(); + if (!StringUtils.equals(message.getPdpGroup(), response.getPdpGroup())) { + return "group does not match"; + } + + if (!StringUtils.equals(message.getPdpSubgroup(), response.getPdpSubgroup())) { + return "subgroup does not match"; + } + + // see if the policies match + Set<ToscaPolicyIdentifier> set1 = new HashSet<>(response.getPolicies()); + Set<ToscaPolicyIdentifier> set2 = new HashSet<>( + message.getPolicies().stream().map(ToscaPolicy::getIdentifier).collect(Collectors.toSet())); + + if (!set1.equals(set2)) { + return "policies do not match"; + } + + return null; + } + + @Override + public boolean isSameContent(Request other) { + if (!(other instanceof UpdateReq)) { + return false; + } + + PdpUpdate first = getMessage(); + PdpUpdate second = (PdpUpdate) other.getMessage(); + + if (!StringUtils.equals(first.getPdpGroup(), second.getPdpGroup())) { + return false; + } + + if (!StringUtils.equals(first.getPdpSubgroup(), second.getPdpSubgroup())) { + return false; + } + + // see if the policies are the same + Set<ToscaPolicy> set1 = new HashSet<>(first.getPolicies()); + Set<ToscaPolicy> set2 = new HashSet<>(second.getPolicies()); + + return set1.equals(set2); + } + + @Override + public int getPriority() { + return 1; + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java index 2c17a0b2..2f74bf3d 100644 --- a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java +++ b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java @@ -23,6 +23,7 @@ package org.onap.policy.pap.main.parameters; import lombok.Getter; import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; import org.onap.policy.pap.main.comm.Publisher; import org.onap.policy.pap.main.comm.TimerManager; @@ -31,10 +32,14 @@ import org.onap.policy.pap.main.comm.TimerManager; * Parameters needed to create a {@link PdpModifyRequestMapParams}. */ @Getter -public class PdpModifyRequestMapParams extends RequestDataParams { +public class PdpModifyRequestMapParams { + private Publisher publisher; + private RequestIdDispatcher<PdpStatus> responseDispatcher; + private Object modifyLock; private PdpParameters params; private TimerManager updateTimers; private TimerManager stateChangeTimers; + private PolicyModelsProviderFactoryWrapper daoFactory; public PdpModifyRequestMapParams setParams(PdpParameters params) { this.params = params; @@ -51,27 +56,41 @@ public class PdpModifyRequestMapParams extends RequestDataParams { return this; } - @Override + public PdpModifyRequestMapParams setDaoFactory(PolicyModelsProviderFactoryWrapper daoFactory) { + this.daoFactory = daoFactory; + return this; + } + public PdpModifyRequestMapParams setPublisher(Publisher publisher) { - super.setPublisher(publisher); + this.publisher = publisher; return this; } - @Override public PdpModifyRequestMapParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) { - super.setResponseDispatcher(responseDispatcher); + this.responseDispatcher = responseDispatcher; return this; } - @Override public PdpModifyRequestMapParams setModifyLock(Object modifyLock) { - super.setModifyLock(modifyLock); + this.modifyLock = modifyLock; return this; } - @Override + /** + * Validates the parameters. + */ public void validate() { - super.validate(); + if (publisher == null) { + throw new IllegalArgumentException("missing publisher"); + } + + if (responseDispatcher == null) { + throw new IllegalArgumentException("missing responseDispatcher"); + } + + if (modifyLock == null) { + throw new IllegalArgumentException("missing modifyLock"); + } if (params == null) { throw new IllegalArgumentException("missing PDP parameters"); diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java index ea4b02ca..b9083864 100644 --- a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java +++ b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java @@ -24,33 +24,46 @@ import lombok.Getter; import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.pap.main.comm.Publisher; -import org.onap.policy.pap.main.comm.RequestData; +import org.onap.policy.pap.main.comm.TimerManager; /** - * Parameters needed to create a {@link RequestData}. + * Parameters needed to create a Request. */ @Getter -public class RequestDataParams { +public class RequestParams { private Publisher publisher; private RequestIdDispatcher<PdpStatus> responseDispatcher; private Object modifyLock; + private TimerManager timers; + private int maxRetryCount; - public RequestDataParams setPublisher(Publisher publisher) { + + public RequestParams setPublisher(Publisher publisher) { this.publisher = publisher; return this; } - public RequestDataParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) { + public RequestParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) { this.responseDispatcher = responseDispatcher; return this; } - public RequestDataParams setModifyLock(Object modifyLock) { + public RequestParams setModifyLock(Object modifyLock) { this.modifyLock = modifyLock; return this; } + public RequestParams setTimers(TimerManager timers) { + this.timers = timers; + return this; + } + + public RequestParams setMaxRetryCount(int maxRetryCount) { + this.maxRetryCount = maxRetryCount; + return this; + } + /** * Validates the parameters. */ @@ -66,5 +79,9 @@ public class RequestDataParams { if (modifyLock == null) { throw new IllegalArgumentException("missing modifyLock"); } + + if (timers == null) { + throw new IllegalArgumentException("missing timers"); + } } } |