aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/main
diff options
context:
space:
mode:
authorJorge Hernandez <jorge.hernandez-herrero@att.com>2019-04-10 18:23:32 +0000
committerGerrit Code Review <gerrit@onap.org>2019-04-10 18:23:32 +0000
commit2de916125dc485d15bdebc8b9e574b855bafc368 (patch)
tree011d2a69bc14d92163ac1e8ab25f9338204e0a52 /main/src/main
parent84da2c8e05ff63163c4431284115c97b29ff1fae (diff)
parentf12eed0a3097518f49731bb722a0063b52d36b2a (diff)
Merge "Refactor request map"
Diffstat (limited to 'main/src/main')
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java457
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java271
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java104
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java124
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java (renamed from main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java)264
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestListener.java47
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java (renamed from main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java)49
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java77
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java114
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java37
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java (renamed from main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java)29
11 files changed, 1009 insertions, 564 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java
index 24443cc2..6a743a31 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java
@@ -20,26 +20,43 @@
package org.onap.policy.pap.main.comm;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.pdp.concepts.Pdp;
+import org.onap.policy.models.pdp.concepts.PdpGroup;
+import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpSubGroup;
import org.onap.policy.models.pdp.concepts.PdpUpdate;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
-import org.onap.policy.pap.main.comm.msgdata.StateChangeData;
-import org.onap.policy.pap.main.comm.msgdata.UpdateData;
+import org.onap.policy.models.pdp.enums.PdpState;
+import org.onap.policy.models.provider.PolicyModelsProvider;
+import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
+import org.onap.policy.pap.main.comm.msgdata.Request;
+import org.onap.policy.pap.main.comm.msgdata.RequestListener;
+import org.onap.policy.pap.main.comm.msgdata.StateChangeReq;
+import org.onap.policy.pap.main.comm.msgdata.UpdateReq;
import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.RequestParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Maps a PDP name to requests that modify PDPs.
*/
public class PdpModifyRequestMap {
+ private static final Logger logger = LoggerFactory.getLogger(PdpModifyRequestMap.class);
+
+ private static final String UNEXPECTED_BROADCAST = "unexpected broadcast message: ";
/**
- * Maps a PDP name to its request data. An entry is removed once all of the requests
- * within the data have been completed.
+ * Maps a PDP name to its outstanding requests.
*/
- private final Map<String, ModifyReqData> name2data = new HashMap<>();
+ private final Map<String, PdpRequests> pdp2requests = new HashMap<>();
/**
* PDP modification lock.
@@ -52,7 +69,13 @@ public class PdpModifyRequestMap {
private final PdpModifyRequestMapParams params;
/**
- * Constructs the data.
+ * Factory for PAP DAO.
+ */
+ private final PolicyModelsProviderFactoryWrapper daoFactory;
+
+
+ /**
+ * Constructs the object.
*
* @param params configuration parameters
*
@@ -63,24 +86,21 @@ public class PdpModifyRequestMap {
this.params = params;
this.modifyLock = params.getModifyLock();
+ this.daoFactory = params.getDaoFactory();
}
/**
- * Adds an UPDATE request to the map.
- *
- * @param update the UPDATE request or {@code null}
- */
- public void addRequest(PdpUpdate update) {
- addRequest(update, null);
- }
-
- /**
- * Adds STATE-CHANGE request to the map.
+ * Stops publishing requests to the given PDP.
*
- * @param stateChange the STATE-CHANGE request or {@code null}
+ * @param pdpName PDP name
*/
- public void addRequest(PdpStateChange stateChange) {
- addRequest(null, stateChange);
+ public void stopPublishing(String pdpName) {
+ synchronized (modifyLock) {
+ PdpRequests requests = pdp2requests.remove(pdpName);
+ if (requests != null) {
+ requests.stopPublishing();
+ }
+ }
}
/**
@@ -90,288 +110,255 @@ public class PdpModifyRequestMap {
* @param stateChange the STATE-CHANGE request or {@code null}
*/
public void addRequest(PdpUpdate update, PdpStateChange stateChange) {
- if (update == null && stateChange == null) {
- return;
- }
+ if (update == null) {
+ addRequest(stateChange);
- synchronized (modifyLock) {
- String pdpName = getPdpName(update, stateChange);
-
- ModifyReqData data = name2data.get(pdpName);
- if (data != null) {
- // update the existing request
- data.add(update);
- data.add(stateChange);
-
- } else {
- data = makeRequestData(update, stateChange);
- name2data.put(pdpName, data);
- data.startPublishing();
+ } else {
+ synchronized (modifyLock) {
+ addRequest(update);
+ addRequest(stateChange);
}
}
}
/**
- * Gets the PDP name from two requests.
+ * Adds an UPDATE request to the map.
*
- * @param update the update request, or {@code null}
- * @param stateChange the state-change request, or {@code null}
- * @return the PDP name, or {@code null} if both requests are {@code null}
+ * @param update the UPDATE request or {@code null}
*/
- private static String getPdpName(PdpUpdate update, PdpStateChange stateChange) {
- String pdpName;
+ public void addRequest(PdpUpdate update) {
+ if (update == null) {
+ return;
+ }
- if (update != null) {
- if ((pdpName = update.getName()) == null) {
- throw new IllegalArgumentException("missing name in " + update);
- }
+ if (isBroadcast(update)) {
+ throw new IllegalArgumentException(UNEXPECTED_BROADCAST + update);
+ }
- if (stateChange != null && !pdpName.equals(stateChange.getName())) {
- throw new IllegalArgumentException(
- "name " + stateChange.getName() + " does not match " + pdpName + " " + stateChange);
- }
+ // @formatter:off
+ RequestParams reqparams = new RequestParams()
+ .setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount())
+ .setTimers(params.getUpdateTimers())
+ .setModifyLock(params.getModifyLock())
+ .setPublisher(params.getPublisher())
+ .setResponseDispatcher(params.getResponseDispatcher());
+ // @formatter:on
- } else {
- if ((pdpName = stateChange.getName()) == null) {
- throw new IllegalArgumentException("missing name in " + stateChange);
- }
- }
+ String name = update.getName() + " " + PdpUpdate.class.getSimpleName();
+ UpdateReq request = new UpdateReq(reqparams, name, update);
- return pdpName;
+ addSingleton(request);
}
/**
- * Determines if two requests are the "same", which is does not necessarily mean
- * "equals".
+ * Adds a STATE-CHANGE request to the map.
*
- * @param first first request to check
- * @param second second request to check
- * @return {@code true} if the requests are the "same", {@code false} otherwise
+ * @param stateChange the STATE-CHANGE request or {@code null}
*/
- protected static boolean isSame(PdpUpdate first, PdpUpdate second) {
- if (first.getPolicies().size() != second.getPolicies().size()) {
- return false;
+ public void addRequest(PdpStateChange stateChange) {
+ if (stateChange == null) {
+ return;
}
- if (!first.getPdpGroup().equals(second.getPdpGroup())) {
- return false;
+ if (isBroadcast(stateChange)) {
+ throw new IllegalArgumentException(UNEXPECTED_BROADCAST + stateChange);
}
- if (!first.getPdpSubgroup().equals(second.getPdpSubgroup())) {
- return false;
- }
+ // @formatter:off
+ RequestParams reqparams = new RequestParams()
+ .setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount())
+ .setTimers(params.getStateChangeTimers())
+ .setModifyLock(params.getModifyLock())
+ .setPublisher(params.getPublisher())
+ .setResponseDispatcher(params.getResponseDispatcher());
+ // @formatter:on
- // see if the other has any policies that this does not have
- ArrayList<ToscaPolicy> lst = new ArrayList<>(second.getPolicies());
- lst.removeAll(first.getPolicies());
+ String name = stateChange.getName() + " " + PdpStateChange.class.getSimpleName();
+ StateChangeReq request = new StateChangeReq(reqparams, name, stateChange);
- return lst.isEmpty();
+ addSingleton(request);
}
/**
- * Determines if two requests are the "same", which is does not necessarily mean
- * "equals".
+ * Determines if a message is a broadcast message.
*
- * @param first first request to check
- * @param second second request to check
- * @return {@code true} if this update subsumes the other, {@code false} otherwise
+ * @param message the message to examine
+ * @return {@code true} if the message is a broadcast message, {@code false} if
+ * destined for a single PDP
*/
- protected static boolean isSame(PdpStateChange first, PdpStateChange second) {
- return (first.getState() == second.getState());
+ private boolean isBroadcast(PdpMessage message) {
+ return (message.getName() == null);
}
/**
- * Request data, which contains an UPDATE or a STATE-CHANGE request, or both. The
- * UPDATE is always published before the STATE-CHANGE. In addition, both requests may
- * be changed at any point, possibly triggering a restart of the publishing.
+ * Configures and adds a request to the map.
+ *
+ * @param request the request to be added
*/
- public class ModifyReqData extends RequestData {
-
- /**
- * The UPDATE message to be published, or {@code null}.
- */
- private PdpUpdate update;
-
- /**
- * The STATE-CHANGE message to be published, or {@code null}.
- */
- private PdpStateChange stateChange;
-
-
- /**
- * Constructs the object.
- *
- * @param newUpdate the UPDATE message to be sent, or {@code null}
- * @param newState the STATE-CHANGE message to be sent, or {@code null}
- */
- public ModifyReqData(PdpUpdate newUpdate, PdpStateChange newState) {
- super(params);
-
- if (newUpdate != null) {
- this.stateChange = newState;
- setName(newUpdate.getName());
- update = newUpdate;
- configure(new ModUpdateData(newUpdate));
-
- } else {
- this.update = null;
- setName(newState.getName());
- stateChange = newState;
- configure(new ModStateChangeData(newState));
- }
- }
+ private void addSingleton(Request request) {
- /**
- * Determines if this request is still in the map.
- */
- @Override
- protected boolean isActive() {
- return (name2data.get(getName()) == this);
+ synchronized (modifyLock) {
+ PdpRequests requests = pdp2requests.computeIfAbsent(request.getMessage().getName(), this::makePdpRequests);
+
+ request.setListener(new SingletonListener(requests, request));
+ requests.addSingleton(request);
}
+ }
- /**
- * Removes this request from the map.
- */
- @Override
- protected void allCompleted() {
- name2data.remove(getName(), this);
+ /**
+ * Starts the next request associated with a PDP.
+ *
+ * @param requests current set of requests
+ * @param request the request that just completed
+ */
+ private void startNextRequest(PdpRequests requests, Request request) {
+ if (!requests.startNextRequest(request)) {
+ pdp2requests.remove(requests.getPdpName(), requests);
}
+ }
- /**
- * Adds an UPDATE to the request data, replacing any existing UPDATE, if
- * appropriate. If the UPDATE is replaced, then publishing is restarted.
- *
- * @param newRequest the new UPDATE request
- */
- private void add(PdpUpdate newRequest) {
- if (newRequest == null) {
- return;
- }
+ /**
+ * Disables a PDP by removing it from its subgroup and then sending it a PASSIVE
+ * request.
+ *
+ * @param requests the requests associated with the PDP to be disabled
+ */
+ private void disablePdp(PdpRequests requests) {
- synchronized (modifyLock) {
- if (update != null && isSame(update, newRequest)) {
- // already have this update - discard it
- return;
- }
+ // remove the requests from the map
+ if (!pdp2requests.remove(requests.getPdpName(), requests)) {
+ // don't have the info we need to disable it
+ logger.warn("no requests with which to disable {}", requests.getPdpName());
+ return;
+ }
- // must restart from scratch
- stopPublishing();
+ logger.warn("disabling {}", requests.getPdpName());
- update = newRequest;
- configure(new ModUpdateData(newRequest));
+ requests.stopPublishing();
- startPublishing();
- }
+ // don't do anything if we don't have a group
+ String name = requests.getLastGroupName();
+ if (name == null) {
+ logger.warn("no group with which to disable {}", requests.getPdpName());
+ return;
}
- /**
- * Adds a STATE-CHANGE to the request data, replacing any existing UPDATE, if
- * appropriate. If the STATE-CHANGE is replaced, and we're currently publishing
- * the STATE-CHANGE, then publishing is restarted.
- *
- * @param newRequest the new STATE-CHANGE request
- */
- private void add(PdpStateChange newRequest) {
- if (newRequest == null) {
+ // remove the PDP from the group
+ removeFromGroup(requests.getPdpName(), name);
+
+ // send the state change
+ PdpStateChange change = new PdpStateChange();
+ change.setName(requests.getPdpName());
+ change.setState(PdpState.PASSIVE);
+ addRequest(change);
+ }
+
+ /**
+ * Removes a PDP from its group.
+ *
+ * @param pdpName name of the PDP to be removed
+ * @param groupName name of the group from which it should be removed
+ */
+ private void removeFromGroup(String pdpName, String groupName) {
+
+ try (PolicyModelsProvider dao = daoFactory.create()) {
+
+ PdpGroupFilter filter = PdpGroupFilter.builder().name(groupName).groupState(PdpState.ACTIVE)
+ .version(PdpGroupFilter.LATEST_VERSION).build();
+
+ List<PdpGroup> groups = dao.getFilteredPdpGroups(filter);
+ if (groups.isEmpty()) {
return;
}
- synchronized (modifyLock) {
- if (stateChange != null && isSame(stateChange, newRequest)) {
- // already have this update - discard it
+ PdpGroup group = groups.get(0);
+
+ for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
+ if (removeFromSubgroup(pdpName, group, subgrp)) {
+ dao.updatePdpGroups(Collections.singletonList(group));
return;
}
+ }
- if (getWrapper() instanceof StateChangeData) {
- // we were publishing STATE-CHANGE, thus must restart it
- stopPublishing();
+ } catch (PfModelException e) {
+ logger.info("unable to remove PDP {} from subgroup", pdpName, e);
+ }
+ }
- stateChange = newRequest;
- configure(new ModStateChangeData(newRequest));
+ /**
+ * Removes a PDP from a subgroup.
+ *
+ * @param pdpName name of the PDP to be removed
+ * @param group group from which to attempt to remove the PDP
+ * @param subgrp subgroup from which to attempt to remove the PDP
+ * @return {@code true} if the PDP was removed, {@code false} if the PDP was not in
+ * the group
+ * @throws PfModelException if a DB error occurs
+ */
+ private boolean removeFromSubgroup(String pdpName, PdpGroup group, PdpSubGroup subgrp) throws PfModelException {
- startPublishing();
+ Iterator<Pdp> iter = subgrp.getPdpInstances().iterator();
- } else {
- // haven't started publishing STATE-CHANGE yet, just replace it
- stateChange = newRequest;
- }
+ while (iter.hasNext()) {
+ Pdp instance = iter.next();
+
+ if (pdpName.equals(instance.getInstanceId())) {
+ logger.info("removed {} from group={} version={} subgroup={}", pdpName, group.getName(),
+ group.getVersion(), subgrp.getPdpType());
+ iter.remove();
+ subgrp.setCurrentInstanceCount(subgrp.getPdpInstances().size());
+ return true;
}
}
- /**
- * Indicates that the retry count was exhausted.
- */
- protected void retryCountExhausted() {
- // remove this request data from the PDP request map
- allCompleted();
+ return false;
+ }
- // TODO what to do?
- }
+ /**
+ * Creates a new set of requests for a PDP. May be overridden by junit tests.
+ *
+ * @param pdpName PDP name
+ * @return a new set of requests
+ */
+ protected PdpRequests makePdpRequests(String pdpName) {
+ return new PdpRequests(pdpName);
+ }
- /**
- * Indicates that a response did not match the data.
- *
- * @param reason the reason for the mismatch
- */
- protected void mismatch(String reason) {
- // remove this request data from the PDP request map
- allCompleted();
+ /**
+ * Listener for singleton request events.
+ */
+ private class SingletonListener implements RequestListener {
+ private final PdpRequests requests;
+ private final Request request;
- // TODO what to do?
+ public SingletonListener(PdpRequests requests, Request request) {
+ this.requests = requests;
+ this.request = request;
}
- /**
- * Wraps an UPDATE.
- */
- private class ModUpdateData extends UpdateData {
-
- public ModUpdateData(PdpUpdate message) {
- super(message, params);
- }
-
- @Override
- public void mismatch(String reason) {
- ModifyReqData.this.mismatch(reason);
+ @Override
+ public void failure(String pdpName, String reason) {
+ if (requests.getPdpName().equals(pdpName)) {
+ disablePdp(requests);
}
+ }
- @Override
- public void completed() {
- if (stateChange == null) {
- // no STATE-CHANGE request - we're done
- allCompleted();
+ @Override
+ public void success(String pdpName) {
+ if (requests.getPdpName().equals(pdpName)) {
+ if (pdp2requests.get(requests.getPdpName()) == requests) {
+ startNextRequest(requests, request);
} else {
- // now process the STATE-CHANGE request
- configure(new ModStateChangeData(stateChange));
- startPublishing();
+ logger.info("discard old requests for {}", pdpName);
+ requests.stopPublishing();
}
}
}
- /**
- * Wraps a STATE-CHANGE.
- */
- private class ModStateChangeData extends StateChangeData {
-
- public ModStateChangeData(PdpStateChange message) {
- super(message, params);
- }
-
- @Override
- public void mismatch(String reason) {
- ModifyReqData.this.mismatch(reason);
- }
-
- @Override
- public void completed() {
- allCompleted();
- }
+ @Override
+ public void retryCountExhausted() {
+ disablePdp(requests);
}
}
-
- // these may be overridden by junit tests
-
- protected ModifyReqData makeRequestData(PdpUpdate update, PdpStateChange stateChange) {
- return new ModifyReqData(update, stateChange);
- }
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java
new file mode 100644
index 00000000..9fbf36d4
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpRequests.java
@@ -0,0 +1,271 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import lombok.Getter;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.pap.main.comm.msgdata.Request;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks requests associated with a particular PDP. Requests may be broadcast requests or
+ * singleton requests (i.e., destined for a single PDP).
+ */
+public class PdpRequests {
+ private static final Logger logger = LoggerFactory.getLogger(PdpRequests.class);
+
+ /**
+ * The maximum request priority + 1.
+ */
+ private static final int MAX_PRIORITY = 2;
+
+ /**
+ * Name of the PDP with which the requests are associated.
+ */
+ @Getter
+ private final String pdpName;
+
+ /**
+ * Index of request currently being published.
+ */
+ private int curIndex = 0;
+
+ /**
+ * Singleton requests. Items may be {@code null}.
+ */
+ private Request[] singletons = new Request[MAX_PRIORITY];
+
+ /**
+ * Last group name to which the associated PDP was assigned.
+ */
+ @Getter
+ private String lastGroupName;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param pdpName name of the PDP with which the requests are associated
+ */
+ public PdpRequests(String pdpName) {
+ this.pdpName = pdpName;
+ }
+
+ /**
+ * Records the group information from the request.
+ *
+ * @param request the request from which to extract the group information
+ */
+ private void recordGroup(Request request) {
+ PdpMessage message = request.getMessage();
+ if (message instanceof PdpUpdate) {
+ lastGroupName = message.getPdpGroup();
+ }
+ }
+
+ /**
+ * Adds a singleton request.
+ *
+ * @param request the request to be added
+ */
+ public void addSingleton(Request request) {
+
+ if (request.getMessage().getName() == null) {
+ throw new IllegalArgumentException("unexpected broadcast for " + pdpName);
+ }
+
+ recordGroup(request);
+
+ if (checkExisting(request)) {
+ // have an existing request that's similar - discard this request
+ return;
+ }
+
+ // no existing request of this type
+
+ int priority = request.getPriority();
+ singletons[priority] = request;
+
+ // stop publishing anything of a lower priority
+ QueueToken<PdpMessage> token = stopPublishingLowerPriority(priority);
+
+ // start publishing if nothing of higher priority
+ if (higherPrioritySingleton(priority)) {
+ logger.info("{} not publishing due to priority higher than {}", pdpName, priority);
+ return;
+ }
+
+ curIndex = priority;
+ request.startPublishing(token);
+ }
+
+ /**
+ * Checks for an existing request.
+ *
+ * @param request the request of interest
+ * @return {@code true} if a similar request already exists, {@code false} otherwise
+ */
+ private boolean checkExisting(Request request) {
+
+ return checkExistingSingleton(request);
+ }
+
+ /**
+ * Checks for an existing singleton request.
+ *
+ * @param request the request of interest
+ * @return {@code true} if a similar singleton request already exists, {@code false}
+ * otherwise
+ */
+ private boolean checkExistingSingleton(Request request) {
+
+ Request exsingle = singletons[request.getPriority()];
+
+ if (exsingle == null) {
+ return false;
+ }
+
+ if (exsingle.isSameContent(request)) {
+ // unchanged from existing request
+ logger.info("{} message content unchanged for {}", pdpName, exsingle.getClass().getSimpleName());
+ return true;
+ }
+
+ // reconfigure the existing request
+ PdpMessage message = request.getMessage();
+ exsingle.reconfigure(message, null);
+
+ // still have a singleton in the queue for this request
+ return true;
+ }
+
+ /**
+ * Stops all publishing and removes this PDP from any broadcast messages.
+ */
+ public void stopPublishing() {
+ // stop singletons
+ for (int x = 0; x < MAX_PRIORITY; ++x) {
+ Request single = singletons[x];
+
+ if (single != null) {
+ singletons[x] = null;
+ single.stopPublishing();
+ }
+ }
+ }
+
+ /**
+ * Stops publishing requests of a lower priority than the specified priority.
+ *
+ * @param priority priority of interest
+ * @return the token that was being used to publish a lower priority request
+ */
+ private QueueToken<PdpMessage> stopPublishingLowerPriority(int priority) {
+
+ // stop singletons
+ for (int x = 0; x < priority; ++x) {
+ Request single = singletons[x];
+
+ if (single != null) {
+ logger.info("{} stop publishing priority {}", pdpName, single.getPriority());
+
+ QueueToken<PdpMessage> token = single.stopPublishing(false);
+ if (token != null) {
+ // found one that was publishing
+ return token;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Starts publishing the next request in the queue.
+ *
+ * @param request the request that just completed
+ * @return {@code true} if there is another request in the queue, {@code false} if all
+ * requests for this PDP have been processed
+ */
+ public boolean startNextRequest(Request request) {
+ if (!zapRequest(curIndex, request)) {
+ // not at curIndex - look for it in other indices
+ for (int x = 0; x < MAX_PRIORITY; ++x) {
+ if (zapRequest(x, request)) {
+ break;
+ }
+ }
+ }
+
+ // find/start the highest priority request
+ for (curIndex = MAX_PRIORITY - 1; curIndex >= 0; --curIndex) {
+
+ if (singletons[curIndex] != null) {
+ logger.info("{} start publishing priority {}", pdpName, curIndex);
+
+ singletons[curIndex].startPublishing();
+ return true;
+ }
+ }
+
+ logger.info("{} has no more requests", pdpName);
+ curIndex = 0;
+
+ return false;
+ }
+
+ /**
+ * Zaps request pointers, if the request appears at the given index.
+ *
+ * @param index index to examine
+ * @param request request of interest
+ * @return {@code true} if a request pointer was zapped, {@code false} if the request
+ * did not appear at the given index
+ */
+ private boolean zapRequest(int index, Request request) {
+ if (singletons[index] == request) {
+ singletons[index] = null;
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Determines if any singleton request, with a higher priority, is associated with the
+ * PDP.
+ *
+ * @param priority priority of interest
+ *
+ * @return {@code true} if the PDP has a singleton, {@code false} otherwise
+ */
+ private boolean higherPrioritySingleton(int priority) {
+ for (int x = priority + 1; x < MAX_PRIORITY; ++x) {
+ if (singletons[x] != null) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java
deleted file mode 100644
index aa288f7d..00000000
--- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP PAP
- * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.pap.main.comm.msgdata;
-
-import org.onap.policy.models.pdp.concepts.PdpMessage;
-import org.onap.policy.models.pdp.concepts.PdpStatus;
-import org.onap.policy.pap.main.comm.TimerManager;
-
-
-/**
- * Wraps a message, providing methods appropriate to the message type.
- */
-public abstract class MessageData {
- private final PdpMessage message;
- private final int maxRetries;
- private final TimerManager timers;
-
- /**
- * Constructs the object.
- *
- * @param message message to be wrapped by this
- * @param maxRetries max number of retries
- * @param timers the timer manager for messages of this type
- */
- public MessageData(PdpMessage message, int maxRetries, TimerManager timers) {
- this.message = message;
- this.maxRetries = maxRetries;
- this.timers = timers;
- }
-
- /**
- * Gets the wrapped message.
- *
- * @return the wrapped message
- */
- public PdpMessage getMessage() {
- return message;
- }
-
- /**
- * Gets a string, suitable for logging, identifying the message type.
- *
- * @return the message type
- */
- public String getType() {
- return message.getClass().getSimpleName();
- }
-
- /**
- * Gets the maximum retry count for the particular message type.
- *
- * @return the maximum retry count
- */
- public int getMaxRetryCount() {
- return maxRetries;
- }
-
- /**
- * Gets the timer manager for the particular message type.
- *
- * @return the timer manager
- */
- public TimerManager getTimers() {
- return timers;
- }
-
- /**
- * Indicates that the response did not match what was expected.
- *
- * @param reason the reason for the mismatch
- */
- public abstract void mismatch(String reason);
-
- /**
- * Indicates that processing of this particular message has completed successfully.
- */
- public abstract void completed();
-
- /**
- * Checks the response to ensure it is as expected.
- *
- * @param response the response to check
- * @return an error message, if a fatal error has occurred, {@code null} otherwise
- */
- public abstract String checkResponse(PdpStatus response);
-}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java
new file mode 100644
index 00000000..1f69dcb5
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/Request.java
@@ -0,0 +1,124 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.QueueToken;
+
+/**
+ * Request data, whose message may be changed at any point, possibly triggering a restart
+ * of the publishing.
+ */
+public interface Request {
+
+ /**
+ * Gets the request priority. Higher priority requests are published before lower
+ * priority requests.
+ *
+ * @return the request priority
+ */
+ public int getPriority();
+
+ /**
+ * Gets the name with which this data is associated, used for logging purposes. This
+ * may be changed when this is reconfigured.
+ *
+ * @return the name with which this data is associated
+ */
+ public String getName();
+
+ /**
+ * Gets the current message.
+ *
+ * @return the current message
+ */
+ public PdpMessage getMessage();
+
+ /**
+ * Sets the listener that will receive request events.
+ *
+ * @param listener the request listener
+ */
+ public void setListener(RequestListener listener);
+
+ /**
+ * Determines if this request is currently being published.
+ *
+ * @return {@code true} if this request is being published, {@code false} otherwise
+ */
+ public boolean isPublishing();
+
+ /**
+ * Starts the publishing process, registering any listeners or timeout handlers, and
+ * adding the request to the publisher queue.
+ */
+ public void startPublishing();
+
+ /**
+ * Starts the publishing process.
+ *
+ * @param token2 token that can be used when publishing, or {@code null} to allocate a
+ * new token
+ */
+ public void startPublishing(QueueToken<PdpMessage> token2);
+
+ /**
+ * Unregisters the listener, cancels the timer, and removes the message from the
+ * queue.
+ */
+ public void stopPublishing();
+
+ /**
+ * Unregisters the listener and cancels the timer.
+ *
+ * @param removeFromQueue {@code true} if the message should be removed from the
+ * queue, {@code false} otherwise
+ * @return the token that was being used to publish the message, or {@code null} if
+ * the request was not being published
+ */
+ public QueueToken<PdpMessage> stopPublishing(boolean removeFromQueue);
+
+ /**
+ * Reconfigures the fields based on the {@link #message} type. Suspends publishing,
+ * updates the configuration, and then resumes publishing.
+ *
+ * @param newMessage the new message
+ * @param token2 token to use when publishing, or {@code null} to allocate a new token
+ */
+ public void reconfigure(PdpMessage newMessage, QueueToken<PdpMessage> token2);
+
+ /**
+ * Checks the response to ensure it is as expected.
+ *
+ * @param response the response to check
+ * @return an error message, if a fatal error has occurred, {@code null} otherwise
+ */
+ public String checkResponse(PdpStatus response);
+
+ /**
+ * Determines if this request has the same content as another request.
+ *
+ * @param other request against which to compare
+ * @return {@code true} if the requests have the same content, {@code false} otherwise
+ */
+ public boolean isSameContent(Request other);
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java
index 29ad85bc..45ca2db4 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestImpl.java
@@ -18,7 +18,7 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.pap.main.comm;
+package org.onap.policy.pap.main.comm.msgdata;
import lombok.AccessLevel;
import lombok.Getter;
@@ -28,54 +28,60 @@ import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.utils.services.ServiceManager;
import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStatus;
-import org.onap.policy.pap.main.comm.msgdata.MessageData;
-import org.onap.policy.pap.main.parameters.RequestDataParams;
+import org.onap.policy.pap.main.comm.QueueToken;
+import org.onap.policy.pap.main.comm.TimerManager;
+import org.onap.policy.pap.main.parameters.RequestParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Request data, the content of which may be changed at any point, possibly triggering a
- * restart of the publishing.
+ * Request data implementation.
*/
-public abstract class RequestData {
- private static final Logger logger = LoggerFactory.getLogger(RequestData.class);
+public abstract class RequestImpl implements Request {
+ private static final Logger logger = LoggerFactory.getLogger(RequestImpl.class);
/**
* Name with which this data is associated, used for logging purposes.
*/
@Getter
- @Setter(AccessLevel.PROTECTED)
- private String name;
+ private final String name;
/**
* The configuration parameters.
*/
- private final RequestDataParams params;
+ @Getter(AccessLevel.PROTECTED)
+ private final RequestParams params;
/**
- * Current retry count.
+ * Used to register/unregister the listener and the timer.
*/
- private int retryCount = 0;
+ private final ServiceManager svcmgr;
/**
- * Used to register/unregister the listener and the timer.
+ * Handles events associated with the request.
+ */
+ @Setter
+ private RequestListener listener;
+
+ /**
+ * Current retry count.
*/
- private ServiceManager svcmgr;
+ @Getter
+ private int retryCount = 0;
/**
- * Wrapper for the message that is currently being published (i.e., {@link #update} or
- * {@link #stateChange}.
+ * The current message.
*/
- @Getter(AccessLevel.PROTECTED)
- private MessageData wrapper;
+ @Getter
+ private PdpMessage message;
/**
- * Used to cancel a timer.
+ * The currently running timer.
*/
private TimerManager.Timer timer;
/**
- * Token that is placed on the queue.
+ * Token that has been placed on the queue.
*/
private QueueToken<PdpMessage> token = null;
@@ -84,72 +90,119 @@ public abstract class RequestData {
* Constructs the object, and validates the parameters.
*
* @param params configuration parameters
+ * @param name the request name, used for logging purposes
+ * @param message the initial message
*
* @throws IllegalArgumentException if a required parameter is not set
*/
- public RequestData(@NonNull RequestDataParams params) {
+ public RequestImpl(@NonNull RequestParams params, @NonNull String name, @NonNull PdpMessage message) {
params.validate();
+ this.name = name;
this.params = params;
+ this.message = message;
+
+ // @formatter:off
+ this.svcmgr = new ServiceManager(name)
+ .addAction("listener",
+ () -> params.getResponseDispatcher()
+ .register(this.message.getRequestId(), this::processResponse),
+ () -> params.getResponseDispatcher().unregister(this.message.getRequestId()))
+ .addAction("timer",
+ () -> timer = params.getTimers().register(this.message.getRequestId(), this::handleTimeout),
+ () -> timer.cancel())
+ .addAction("enqueue",
+ () -> enqueue(),
+ () -> {
+ // do not remove from the queue - token may be re-used
+ });
+ // @formatter:on
}
- /**
- * Starts the publishing process, registering any listeners or timeout handlers, and
- * adding the request to the publisher queue. This should not be invoked until after
- * {@link #configure(MessageData)} is invoked.
- */
+ @Override
+ public void reconfigure(PdpMessage newMessage, QueueToken<PdpMessage> token2) {
+ if (newMessage.getClass() != message.getClass()) {
+ throw new IllegalArgumentException("expecting " + message.getClass().getSimpleName() + " instead of "
+ + newMessage.getClass().getSimpleName());
+ }
+
+ logger.info("reconfiguring {} with new message", getName());
+
+ if (svcmgr.isAlive()) {
+ token = stopPublishing(false);
+ message = newMessage;
+ startPublishing(token2);
+
+ } else {
+ message = newMessage;
+ }
+ }
+
+ @Override
+ public boolean isPublishing() {
+ return svcmgr.isAlive();
+ }
+
+ @Override
public void startPublishing() {
+ startPublishing(null);
+ }
+
+ @Override
+ public void startPublishing(QueueToken<PdpMessage> token2) {
+ if (listener == null) {
+ throw new IllegalStateException("listener has not been set");
+ }
synchronized (params.getModifyLock()) {
- if (!svcmgr.isAlive()) {
+ replaceToken(token2);
+
+ if (svcmgr.isAlive()) {
+ logger.info("{} is already publishing", getName());
+
+ } else {
+ resetRetryCount();
svcmgr.start();
}
}
}
/**
- * Unregisters the listener and cancels the timer.
+ * Replaces the current token with a new token.
+ * @param newToken the new token
*/
- protected void stopPublishing() {
- if (svcmgr.isAlive()) {
- svcmgr.stop();
+ private void replaceToken(QueueToken<PdpMessage> newToken) {
+ if (newToken != null) {
+ if (token == null) {
+ token = newToken;
+
+ } else if (token != newToken) {
+ // already have a token - discard the new token
+ newToken.replaceItem(null);
+ }
}
}
- /**
- * Configures the fields based on the {@link #message} type.
- *
- * @param newWrapper the new message wrapper
- */
- protected void configure(MessageData newWrapper) {
-
- wrapper = newWrapper;
+ @Override
+ public void stopPublishing() {
+ stopPublishing(true);
+ }
- resetRetryCount();
+ @Override
+ public QueueToken<PdpMessage> stopPublishing(boolean removeFromQueue) {
+ if (svcmgr.isAlive()) {
+ svcmgr.stop();
- TimerManager timerManager = wrapper.getTimers();
- String msgType = wrapper.getType();
- String reqid = wrapper.getMessage().getRequestId();
+ if (removeFromQueue) {
+ token.replaceItem(null);
+ token = null;
+ }
+ }
- /*
- * We have to configure the service manager HERE, because it's name changes if the
- * message class changes.
- */
+ QueueToken<PdpMessage> tok = token;
+ token = null;
- // @formatter:off
- this.svcmgr = new ServiceManager(name + " " + msgType)
- .addAction("listener",
- () -> params.getResponseDispatcher().register(reqid, this::processResponse),
- () -> params.getResponseDispatcher().unregister(reqid))
- .addAction("timer",
- () -> timer = timerManager.register(name, this::handleTimeout),
- () -> timer.cancel())
- .addAction("enqueue",
- () -> enqueue(),
- () -> {
- // nothing to "stop"
- });
- // @formatter:on
+ return tok;
}
/**
@@ -157,7 +210,6 @@ public abstract class RequestData {
* if possible. Otherwise, it adds a new token to the queue.
*/
private void enqueue() {
- PdpMessage message = wrapper.getMessage();
if (token != null && token.replaceItem(message) != null) {
// took the other's place in the queue - continue using the token
return;
@@ -171,7 +223,7 @@ public abstract class RequestData {
/**
* Resets the retry count.
*/
- protected void resetRetryCount() {
+ public void resetRetryCount() {
retryCount = 0;
}
@@ -180,8 +232,8 @@ public abstract class RequestData {
*
* @return {@code true} if successful, {@code false} if the limit has been reached
*/
- protected boolean bumpRetryCount() {
- if (retryCount >= wrapper.getMaxRetryCount()) {
+ public boolean bumpRetryCount() {
+ if (retryCount >= params.getMaxRetryCount()) {
return false;
}
@@ -190,15 +242,6 @@ public abstract class RequestData {
}
/**
- * Indicates that the retry count was exhausted. The default method simply invokes
- * {@link #allCompleted()}.
- */
- protected void retryCountExhausted() {
- // remove this request data from the PDP request map
- allCompleted();
- }
-
- /**
* Processes a response received from the PDP.
*
* @param infra infrastructure on which the response was received
@@ -208,26 +251,22 @@ public abstract class RequestData {
private void processResponse(CommInfrastructure infra, String topic, PdpStatus response) {
synchronized (params.getModifyLock()) {
+ String pdpName = response.getName();
+
if (!svcmgr.isAlive()) {
// this particular request must have been discarded
return;
}
- stopPublishing();
-
- if (!isActive()) {
- return;
- }
-
- String reason = wrapper.checkResponse(response);
+ String reason = checkResponse(response);
if (reason != null) {
logger.info("{} PDP data mismatch: {}", getName(), reason);
- wrapper.mismatch(reason);
-
- } else {
- logger.info("{} {} successful", getName(), wrapper.getType());
- wrapper.completed();
+ listener.failure(pdpName, reason);
+ return;
}
+
+ logger.info("{} successful", getName());
+ listener.success(pdpName);
}
}
@@ -246,51 +285,36 @@ public abstract class RequestData {
stopPublishing();
- if (!isActive()) {
- return;
- }
-
- if (isInQueue()) {
- // haven't published yet - just leave it in the queue and reset counts
- logger.info("{} timeout - request still in the queue", getName());
- resetRetryCount();
- startPublishing();
- return;
- }
-
if (!bumpRetryCount()) {
- logger.info("{} timeout - retry count exhausted", getName());
- retryCountExhausted();
+ logger.info("{} timeout - retry count {} exhausted", getName(), retryCount);
+ listener.retryCountExhausted();
return;
}
// re-publish
- logger.info("{} timeout - re-publish", getName());
+ logger.info("{} timeout - re-publish count {}", getName(), retryCount);
+
+ // startPublishing() resets the count, so save & restore it here
+ int count = retryCount;
startPublishing();
+ retryCount = count;
}
}
/**
- * Determines if the current message is still in the queue. Assumes that
- * {@link #startPublishing()} has been invoked and thus {@link #token} has been
- * initialized.
- *
- * @return {@code true} if the current message is in the queue, {@code false}
- * otherwise
+ * Verifies that the name is not null. Also verifies that it matches the name in the
+ * message, if the message has a name.
*/
- private boolean isInQueue() {
- return (token.get() == wrapper.getMessage());
- }
+ @Override
+ public String checkResponse(PdpStatus response) {
+ if (response.getName() == null) {
+ return "null PDP name";
+ }
- /**
- * Determines if this request data is still active.
- *
- * @return {@code true} if this request is active, {@code false} otherwise
- */
- protected abstract boolean isActive();
+ if (message.getName() != null && !message.getName().equals(response.getName())) {
+ return "PDP name does not match";
+ }
- /**
- * Indicates that this entire request has completed.
- */
- protected abstract void allCompleted();
+ return null;
+ }
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestListener.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestListener.java
new file mode 100644
index 00000000..4c53bd64
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/RequestListener.java
@@ -0,0 +1,47 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+/**
+ * Listener for request events.
+ */
+public interface RequestListener {
+
+ /**
+ * Indicates that an invalid response was received from a PDP.
+ *
+ * @param pdpName name of the PDP from which the response was received
+ * @param reason the reason for the mismatch
+ */
+ public void failure(String pdpName, String reason);
+
+ /**
+ * Indicates that a successful response was received from a PDP.
+ *
+ * @param pdpName name of the PDP from which the response was received
+ */
+ public void success(String pdpName);
+
+ /**
+ * Indicates that the retry count was exhausted.
+ */
+ public void retryCountExhausted();
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java
index ecbf5dfa..e38cb0fd 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeReq.java
@@ -20,38 +20,61 @@
package org.onap.policy.pap.main.comm.msgdata;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStateChange;
import org.onap.policy.models.pdp.concepts.PdpStatus;
-import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.RequestParams;
/**
* Wraps a STATE-CHANGE.
*/
-public abstract class StateChangeData extends MessageData {
- private PdpStateChange stateChange;
+public class StateChangeReq extends RequestImpl {
/**
- * Constructs the object.
+ * Constructs the object, and validates the parameters.
*
- * @param message message to be wrapped by this
- * @param params the parameters
+ * @param params configuration parameters
+ * @param name the request name, used for logging purposes
+ * @param message the initial message
+ *
+ * @throws IllegalArgumentException if a required parameter is not set
*/
- public StateChangeData(PdpStateChange message, PdpModifyRequestMapParams params) {
- super(message, params.getParams().getStateChangeParameters().getMaxRetryCount(), params.getStateChangeTimers());
+ public StateChangeReq(RequestParams params, String name, PdpMessage message) {
+ super(params, name, message);
+ }
- stateChange = message;
+ @Override
+ public PdpStateChange getMessage() {
+ return (PdpStateChange) super.getMessage();
}
@Override
public String checkResponse(PdpStatus response) {
- if (!stateChange.getName().equals(response.getName())) {
- return "name does not match";
+ String reason = super.checkResponse(response);
+ if (reason != null) {
+ return reason;
}
- if (response.getState() != stateChange.getState()) {
- return "state is " + response.getState() + ", but expected " + stateChange.getState();
+ PdpStateChange message = getMessage();
+ if (response.getState() != message.getState()) {
+ return "state is " + response.getState() + ", but expected " + message.getState();
}
return null;
}
+
+ @Override
+ public boolean isSameContent(Request other) {
+ if (!(other instanceof StateChangeReq)) {
+ return false;
+ }
+
+ PdpStateChange message = (PdpStateChange) other.getMessage();
+ return (getMessage().getState() == message.getState());
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java
deleted file mode 100644
index eca0b384..00000000
--- a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP PAP
- * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.pap.main.comm.msgdata;
-
-import java.util.HashSet;
-import java.util.Set;
-import org.onap.policy.models.pdp.concepts.PdpStatus;
-import org.onap.policy.models.pdp.concepts.PdpUpdate;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
-import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
-
-
-/**
- * Wraps an UPDATE.
- */
-public abstract class UpdateData extends MessageData {
- private PdpUpdate update;
-
- /**
- * Constructs the object.
- *
- * @param message message to be wrapped by this
- * @param params the parameters
- */
- public UpdateData(PdpUpdate message, PdpModifyRequestMapParams params) {
- super(message, params.getParams().getUpdateParameters().getMaxRetryCount(), params.getUpdateTimers());
-
- update = message;
- }
-
- @Override
- public String checkResponse(PdpStatus response) {
- if (!update.getName().equals(response.getName())) {
- return "name does not match";
- }
-
- if (!update.getPdpGroup().equals(response.getPdpGroup())) {
- return "group does not match";
- }
-
- if (!update.getPdpSubgroup().equals(response.getPdpSubgroup())) {
- return "subgroup does not match";
- }
-
- // see if the other has any policies that this does not have
- Set<ToscaPolicyIdentifier> set = new HashSet<>(response.getPolicies());
-
- for (ToscaPolicy policy : update.getPolicies()) {
- set.remove(policy.getIdentifier());
- }
-
- if (!set.isEmpty()) {
- return "policies do not match";
- }
-
- return null;
- }
-}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java
new file mode 100644
index 00000000..0f6d73fc
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateReq.java
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
+import org.onap.policy.pap.main.parameters.RequestParams;
+
+
+/**
+ * Wraps an UPDATE.
+ */
+public class UpdateReq extends RequestImpl {
+
+ /**
+ * Constructs the object, and validates the parameters.
+ *
+ * @param params configuration parameters
+ * @param name the request name, used for logging purposes
+ * @param message the initial message
+ *
+ * @throws IllegalArgumentException if a required parameter is not set
+ */
+ public UpdateReq(RequestParams params, String name, PdpMessage message) {
+ super(params, name, message);
+ }
+
+ @Override
+ public PdpUpdate getMessage() {
+ return (PdpUpdate) super.getMessage();
+ }
+
+ @Override
+ public String checkResponse(PdpStatus response) {
+ String reason = super.checkResponse(response);
+ if (reason != null) {
+ return reason;
+ }
+
+ PdpUpdate message = (PdpUpdate) getMessage();
+ if (!StringUtils.equals(message.getPdpGroup(), response.getPdpGroup())) {
+ return "group does not match";
+ }
+
+ if (!StringUtils.equals(message.getPdpSubgroup(), response.getPdpSubgroup())) {
+ return "subgroup does not match";
+ }
+
+ // see if the policies match
+ Set<ToscaPolicyIdentifier> set1 = new HashSet<>(response.getPolicies());
+ Set<ToscaPolicyIdentifier> set2 = new HashSet<>(
+ message.getPolicies().stream().map(ToscaPolicy::getIdentifier).collect(Collectors.toSet()));
+
+ if (!set1.equals(set2)) {
+ return "policies do not match";
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean isSameContent(Request other) {
+ if (!(other instanceof UpdateReq)) {
+ return false;
+ }
+
+ PdpUpdate first = getMessage();
+ PdpUpdate second = (PdpUpdate) other.getMessage();
+
+ if (!StringUtils.equals(first.getPdpGroup(), second.getPdpGroup())) {
+ return false;
+ }
+
+ if (!StringUtils.equals(first.getPdpSubgroup(), second.getPdpSubgroup())) {
+ return false;
+ }
+
+ // see if the policies are the same
+ Set<ToscaPolicy> set1 = new HashSet<>(first.getPolicies());
+ Set<ToscaPolicy> set2 = new HashSet<>(second.getPolicies());
+
+ return set1.equals(set2);
+ }
+
+ @Override
+ public int getPriority() {
+ return 1;
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java
index 2c17a0b2..2f74bf3d 100644
--- a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java
+++ b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java
@@ -23,6 +23,7 @@ package org.onap.policy.pap.main.parameters;
import lombok.Getter;
import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
import org.onap.policy.pap.main.comm.Publisher;
import org.onap.policy.pap.main.comm.TimerManager;
@@ -31,10 +32,14 @@ import org.onap.policy.pap.main.comm.TimerManager;
* Parameters needed to create a {@link PdpModifyRequestMapParams}.
*/
@Getter
-public class PdpModifyRequestMapParams extends RequestDataParams {
+public class PdpModifyRequestMapParams {
+ private Publisher publisher;
+ private RequestIdDispatcher<PdpStatus> responseDispatcher;
+ private Object modifyLock;
private PdpParameters params;
private TimerManager updateTimers;
private TimerManager stateChangeTimers;
+ private PolicyModelsProviderFactoryWrapper daoFactory;
public PdpModifyRequestMapParams setParams(PdpParameters params) {
this.params = params;
@@ -51,27 +56,41 @@ public class PdpModifyRequestMapParams extends RequestDataParams {
return this;
}
- @Override
+ public PdpModifyRequestMapParams setDaoFactory(PolicyModelsProviderFactoryWrapper daoFactory) {
+ this.daoFactory = daoFactory;
+ return this;
+ }
+
public PdpModifyRequestMapParams setPublisher(Publisher publisher) {
- super.setPublisher(publisher);
+ this.publisher = publisher;
return this;
}
- @Override
public PdpModifyRequestMapParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) {
- super.setResponseDispatcher(responseDispatcher);
+ this.responseDispatcher = responseDispatcher;
return this;
}
- @Override
public PdpModifyRequestMapParams setModifyLock(Object modifyLock) {
- super.setModifyLock(modifyLock);
+ this.modifyLock = modifyLock;
return this;
}
- @Override
+ /**
+ * Validates the parameters.
+ */
public void validate() {
- super.validate();
+ if (publisher == null) {
+ throw new IllegalArgumentException("missing publisher");
+ }
+
+ if (responseDispatcher == null) {
+ throw new IllegalArgumentException("missing responseDispatcher");
+ }
+
+ if (modifyLock == null) {
+ throw new IllegalArgumentException("missing modifyLock");
+ }
if (params == null) {
throw new IllegalArgumentException("missing PDP parameters");
diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java
index ea4b02ca..b9083864 100644
--- a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java
+++ b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestParams.java
@@ -24,33 +24,46 @@ import lombok.Getter;
import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.pap.main.comm.Publisher;
-import org.onap.policy.pap.main.comm.RequestData;
+import org.onap.policy.pap.main.comm.TimerManager;
/**
- * Parameters needed to create a {@link RequestData}.
+ * Parameters needed to create a Request.
*/
@Getter
-public class RequestDataParams {
+public class RequestParams {
private Publisher publisher;
private RequestIdDispatcher<PdpStatus> responseDispatcher;
private Object modifyLock;
+ private TimerManager timers;
+ private int maxRetryCount;
- public RequestDataParams setPublisher(Publisher publisher) {
+
+ public RequestParams setPublisher(Publisher publisher) {
this.publisher = publisher;
return this;
}
- public RequestDataParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) {
+ public RequestParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) {
this.responseDispatcher = responseDispatcher;
return this;
}
- public RequestDataParams setModifyLock(Object modifyLock) {
+ public RequestParams setModifyLock(Object modifyLock) {
this.modifyLock = modifyLock;
return this;
}
+ public RequestParams setTimers(TimerManager timers) {
+ this.timers = timers;
+ return this;
+ }
+
+ public RequestParams setMaxRetryCount(int maxRetryCount) {
+ this.maxRetryCount = maxRetryCount;
+ return this;
+ }
+
/**
* Validates the parameters.
*/
@@ -66,5 +79,9 @@ public class RequestDataParams {
if (modifyLock == null) {
throw new IllegalArgumentException("missing modifyLock");
}
+
+ if (timers == null) {
+ throw new IllegalArgumentException("missing timers");
+ }
}
}