summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java377
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java136
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/QueueToken.java75
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java296
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java310
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java104
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java57
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java78
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java88
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java70
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java84
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/PdpModifyRequestMapTest.java575
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java265
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/QueueTokenTest.java87
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/RequestDataTest.java476
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/Threaded.java112
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/TimerManagerTest.java400
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/msgdata/MessageDataTest.java92
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/msgdata/StateChangeDataTest.java122
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/comm/msgdata/UpdateDataTest.java169
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/parameters/TestPdpModifyRequestMapParams.java110
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/parameters/TestRequestDataParams.java87
-rw-r--r--main/src/test/java/org/onap/policy/pap/main/startstop/TestPapActivator.java9
23 files changed, 4167 insertions, 12 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java
new file mode 100644
index 00000000..f2ebca5c
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java
@@ -0,0 +1,377 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy;
+import org.onap.policy.pap.main.comm.msgdata.StateChangeData;
+import org.onap.policy.pap.main.comm.msgdata.UpdateData;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+
+/**
+ * Maps a PDP name to requests that modify PDPs.
+ */
+public class PdpModifyRequestMap {
+
+ /**
+ * Maps a PDP name to its request data. An entry is removed once all of the requests
+ * within the data have been completed.
+ */
+ private final Map<String, ModifyReqData> name2data = new HashMap<>();
+
+ /**
+ * PDP modification lock.
+ */
+ private final Object modifyLock;
+
+ /**
+ * The configuration parameters.
+ */
+ private final PdpModifyRequestMapParams params;
+
+ /**
+ * Constructs the data.
+ *
+ * @param params configuration parameters
+ *
+ * @throws IllegalArgumentException if a required parameter is not set
+ */
+ public PdpModifyRequestMap(PdpModifyRequestMapParams params) {
+ params.validate();
+
+ this.params = params;
+ this.modifyLock = params.getModifyLock();
+ }
+
+ /**
+ * Adds an UPDATE request to the map.
+ *
+ * @param update the UPDATE request or {@code null}
+ */
+ public void addRequest(PdpUpdate update) {
+ addRequest(update, null);
+ }
+
+ /**
+ * Adds STATE-CHANGE request to the map.
+ *
+ * @param stateChange the STATE-CHANGE request or {@code null}
+ */
+ public void addRequest(PdpStateChange stateChange) {
+ addRequest(null, stateChange);
+ }
+
+ /**
+ * Adds a pair of requests to the map.
+ *
+ * @param update the UPDATE request or {@code null}
+ * @param stateChange the STATE-CHANGE request or {@code null}
+ */
+ public void addRequest(PdpUpdate update, PdpStateChange stateChange) {
+ if (update == null && stateChange == null) {
+ return;
+ }
+
+ synchronized (modifyLock) {
+ String pdpName = getPdpName(update, stateChange);
+
+ ModifyReqData data = name2data.get(pdpName);
+ if (data != null) {
+ // update the existing request
+ data.add(update);
+ data.add(stateChange);
+
+ } else {
+ data = makeRequestData(update, stateChange);
+ name2data.put(pdpName, data);
+ data.startPublishing();
+ }
+ }
+ }
+
+ /**
+ * Gets the PDP name from two requests.
+ *
+ * @param update the update request, or {@code null}
+ * @param stateChange the state-change request, or {@code null}
+ * @return the PDP name, or {@code null} if both requests are {@code null}
+ */
+ private static String getPdpName(PdpUpdate update, PdpStateChange stateChange) {
+ String pdpName;
+
+ if (update != null) {
+ if ((pdpName = update.getName()) == null) {
+ throw new IllegalArgumentException("missing name in " + update);
+ }
+
+ if (stateChange != null && !pdpName.equals(stateChange.getName())) {
+ throw new IllegalArgumentException(
+ "name " + stateChange.getName() + " does not match " + pdpName + " " + stateChange);
+ }
+
+ } else {
+ if ((pdpName = stateChange.getName()) == null) {
+ throw new IllegalArgumentException("missing name in " + stateChange);
+ }
+ }
+
+ return pdpName;
+ }
+
+ /**
+ * Determines if two requests are the "same", which is does not necessarily mean
+ * "equals".
+ *
+ * @param first first request to check
+ * @param second second request to check
+ * @return {@code true} if the requests are the "same", {@code false} otherwise
+ */
+ protected static boolean isSame(PdpUpdate first, PdpUpdate second) {
+ if (first.getPolicies().size() != second.getPolicies().size()) {
+ return false;
+ }
+
+ if (!first.getPdpGroup().equals(second.getPdpGroup())) {
+ return false;
+ }
+
+ if (!first.getPdpSubgroup().equals(second.getPdpSubgroup())) {
+ return false;
+ }
+
+ // see if the other has any policies that this does not have
+ ArrayList<ToscaPolicy> lst = new ArrayList<>(second.getPolicies());
+ lst.removeAll(first.getPolicies());
+
+ return lst.isEmpty();
+ }
+
+ /**
+ * Determines if two requests are the "same", which is does not necessarily mean
+ * "equals".
+ *
+ * @param first first request to check
+ * @param second second request to check
+ * @return {@code true} if this update subsumes the other, {@code false} otherwise
+ */
+ protected static boolean isSame(PdpStateChange first, PdpStateChange second) {
+ return (first.getState() == second.getState());
+ }
+
+ /**
+ * Request data, which contains an UPDATE or a STATE-CHANGE request, or both. The
+ * UPDATE is always published before the STATE-CHANGE. In addition, both requests may
+ * be changed at any point, possibly triggering a restart of the publishing.
+ */
+ public class ModifyReqData extends RequestData {
+
+ /**
+ * The UPDATE message to be published, or {@code null}.
+ */
+ private PdpUpdate update;
+
+ /**
+ * The STATE-CHANGE message to be published, or {@code null}.
+ */
+ private PdpStateChange stateChange;
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param newUpdate the UPDATE message to be sent, or {@code null}
+ * @param newState the STATE-CHANGE message to be sent, or {@code null}
+ */
+ public ModifyReqData(PdpUpdate newUpdate, PdpStateChange newState) {
+ super(params);
+
+ if (newUpdate != null) {
+ this.stateChange = newState;
+ setName(newUpdate.getName());
+ update = newUpdate;
+ configure(new ModUpdateData(newUpdate));
+
+ } else {
+ this.update = null;
+ setName(newState.getName());
+ stateChange = newState;
+ configure(new ModStateChangeData(newState));
+ }
+ }
+
+ /**
+ * Determines if this request is still in the map.
+ */
+ @Override
+ protected boolean isActive() {
+ return (name2data.get(getName()) == this);
+ }
+
+ /**
+ * Removes this request from the map.
+ */
+ @Override
+ protected void allCompleted() {
+ name2data.remove(getName(), this);
+ }
+
+ /**
+ * Adds an UPDATE to the request data, replacing any existing UPDATE, if
+ * appropriate. If the UPDATE is replaced, then publishing is restarted.
+ *
+ * @param newRequest the new UPDATE request
+ */
+ private void add(PdpUpdate newRequest) {
+ if (newRequest == null) {
+ return;
+ }
+
+ synchronized (modifyLock) {
+ if (update != null && isSame(update, newRequest)) {
+ // already have this update - discard it
+ return;
+ }
+
+ // must restart from scratch
+ stopPublishing();
+
+ update = newRequest;
+ configure(new ModUpdateData(newRequest));
+
+ startPublishing();
+ }
+ }
+
+ /**
+ * Adds a STATE-CHANGE to the request data, replacing any existing UPDATE, if
+ * appropriate. If the STATE-CHANGE is replaced, and we're currently publishing
+ * the STATE-CHANGE, then publishing is restarted.
+ *
+ * @param newRequest the new STATE-CHANGE request
+ */
+ private void add(PdpStateChange newRequest) {
+ if (newRequest == null) {
+ return;
+ }
+
+ synchronized (modifyLock) {
+ if (stateChange != null && isSame(stateChange, newRequest)) {
+ // already have this update - discard it
+ return;
+ }
+
+ if (getWrapper() instanceof StateChangeData) {
+ // we were publishing STATE-CHANGE, thus must restart it
+ stopPublishing();
+
+ stateChange = newRequest;
+ configure(new ModStateChangeData(newRequest));
+
+ startPublishing();
+
+ } else {
+ // haven't started publishing STATE-CHANGE yet, just replace it
+ stateChange = newRequest;
+ }
+ }
+ }
+
+ /**
+ * Indicates that the retry count was exhausted.
+ */
+ protected void retryCountExhausted() {
+ // remove this request data from the PDP request map
+ allCompleted();
+
+ // TODO what to do?
+ }
+
+ /**
+ * Indicates that a response did not match the data.
+ *
+ * @param reason the reason for the mismatch
+ */
+ protected void mismatch(String reason) {
+ // remove this request data from the PDP request map
+ allCompleted();
+
+ // TODO what to do?
+ }
+
+ /**
+ * Wraps an UPDATE.
+ */
+ private class ModUpdateData extends UpdateData {
+
+ public ModUpdateData(PdpUpdate message) {
+ super(message, params);
+ }
+
+ @Override
+ public void mismatch(String reason) {
+ ModifyReqData.this.mismatch(reason);
+ }
+
+ @Override
+ public void completed() {
+ if (stateChange == null) {
+ // no STATE-CHANGE request - we're done
+ allCompleted();
+
+ } else {
+ // now process the STATE-CHANGE request
+ configure(new ModStateChangeData(stateChange));
+ startPublishing();
+ }
+ }
+ }
+
+ /**
+ * Wraps a STATE-CHANGE.
+ */
+ private class ModStateChangeData extends StateChangeData {
+
+ public ModStateChangeData(PdpStateChange message) {
+ super(message, params);
+ }
+
+ @Override
+ public void mismatch(String reason) {
+ ModifyReqData.this.mismatch(reason);
+ }
+
+ @Override
+ public void completed() {
+ allCompleted();
+ }
+ }
+ }
+
+ // these may be overridden by junit tests
+
+ protected ModifyReqData makeRequestData(PdpUpdate update, PdpStateChange stateChange) {
+ return new ModifyReqData(update, stateChange);
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java b/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java
new file mode 100644
index 00000000..6032d17e
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java
@@ -0,0 +1,136 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.pap.main.PolicyPapException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Publishes messages to a topic. Maintains a queue of references to data that is to be
+ * published. Once the publisher removes a reference from a queue, it sets it to
+ * {@link null} to indicate that it is being processed. Until it has been set to
+ * {@link null}, clients are free to atomically update the reference to new values, thus
+ * maintaining their place in the queue.
+ *
+ * <p>This class has not been tested for multiple threads invoking {@link #run()}
+ * simultaneously.
+ */
+public class Publisher implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
+
+ /**
+ * Used to send to the topic.
+ */
+ private final TopicSinkClient client;
+
+ /**
+ * Request queue. The references may contain {@code null}.
+ */
+ private final BlockingQueue<QueueToken<PdpMessage>> queue = new LinkedBlockingQueue<>();
+
+ /**
+ * Set to {@code true} to cause the publisher to stop running.
+ */
+ private volatile boolean stopNow = false;
+
+ /**
+ * Constructs the object.
+ *
+ * @param topic name of the topic to which to publish
+ * @throws PolicyPapException if the topic sink does not exist
+ */
+ public Publisher(String topic) throws PolicyPapException {
+ try {
+ this.client = new TopicSinkClient(topic);
+ } catch (TopicSinkClientException e) {
+ throw new PolicyPapException(e);
+ }
+ }
+
+ /**
+ * Stops the publisher, if it's running.
+ */
+ public void stop() {
+ stopNow = true;
+
+ // add an empty reference so the thread doesn't block on the queue
+ queue.add(new QueueToken<>(null));
+ }
+
+ /**
+ * Adds an item to the queue. The referenced objects are assumed to be POJOs and will
+ * be converted to JSON via the {@link StandardCoder} prior to publishing.
+ *
+ * @param ref reference to the message to be published
+ */
+ public void enqueue(QueueToken<PdpMessage> ref) {
+ queue.add(ref);
+ }
+
+ /**
+ * Continuously publishes items in the queue until {@link #stop()} is invoked.
+ */
+ @Override
+ public void run() {
+ for (;;) {
+ QueueToken<PdpMessage> token = getNext();
+
+ if (stopNow) {
+ // unblock any other publisher threads
+ queue.offer(new QueueToken<>(null));
+ break;
+ }
+
+ PdpMessage data = token.replaceItem(null);
+ if (data == null) {
+ continue;
+ }
+
+ client.send(data);
+ }
+ }
+
+ /**
+ * Gets the next item from the queue. If the thread is interrupted, then it sets
+ * {@link #stopNow}.
+ *
+ * @return the next item, or a reference containing {@code null} if this is
+ * interrupted
+ */
+ private QueueToken<PdpMessage> getNext() {
+ try {
+ return queue.take();
+
+ } catch (InterruptedException e) {
+ logger.warn("Publisher stopping due to interrupt");
+ stopNow = true;
+ Thread.currentThread().interrupt();
+ return new QueueToken<>(null);
+ }
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/QueueToken.java b/main/src/main/java/org/onap/policy/pap/main/comm/QueueToken.java
new file mode 100644
index 00000000..a68b7c05
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/QueueToken.java
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Token that can be placed within a publisher's queue. The item that a token references
+ * may be replaced any time up until it is set to {@code null}. Once it has been set to
+ * {@code null}, it cannot be replaced.
+ *
+ * @param <T> type of object referenced by the token
+ */
+public class QueueToken<T> {
+
+ /**
+ * Wraps the item.
+ */
+ private final AtomicReference<T> ref;
+
+ /**
+ * Constructs the object.
+ *
+ * @param item initial token item
+ */
+ public QueueToken(T item) {
+ ref = new AtomicReference<>(item);
+ }
+
+ /**
+ * Gets the item referenced by this token.
+ *
+ * @return the item referenced by this token
+ */
+ public final T get() {
+ return ref.get();
+ }
+
+ /**
+ * Replaces the token's item. If the current item is {@code null}, then it is left
+ * unchanged.
+ *
+ * @param newItem the new item
+ * @return the original item
+ */
+ public T replaceItem(T newItem) {
+ T oldItem;
+ while ((oldItem = ref.get()) != null) {
+ if (ref.compareAndSet(oldItem, newItem)) {
+ break;
+ }
+ }
+
+ // it was already null, or we successfully replaced the item
+ return oldItem;
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java b/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java
new file mode 100644
index 00000000..29ad85bc
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java
@@ -0,0 +1,296 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.utils.services.ServiceManager;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.msgdata.MessageData;
+import org.onap.policy.pap.main.parameters.RequestDataParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Request data, the content of which may be changed at any point, possibly triggering a
+ * restart of the publishing.
+ */
+public abstract class RequestData {
+ private static final Logger logger = LoggerFactory.getLogger(RequestData.class);
+
+ /**
+ * Name with which this data is associated, used for logging purposes.
+ */
+ @Getter
+ @Setter(AccessLevel.PROTECTED)
+ private String name;
+
+ /**
+ * The configuration parameters.
+ */
+ private final RequestDataParams params;
+
+ /**
+ * Current retry count.
+ */
+ private int retryCount = 0;
+
+ /**
+ * Used to register/unregister the listener and the timer.
+ */
+ private ServiceManager svcmgr;
+
+ /**
+ * Wrapper for the message that is currently being published (i.e., {@link #update} or
+ * {@link #stateChange}.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ private MessageData wrapper;
+
+ /**
+ * Used to cancel a timer.
+ */
+ private TimerManager.Timer timer;
+
+ /**
+ * Token that is placed on the queue.
+ */
+ private QueueToken<PdpMessage> token = null;
+
+
+ /**
+ * Constructs the object, and validates the parameters.
+ *
+ * @param params configuration parameters
+ *
+ * @throws IllegalArgumentException if a required parameter is not set
+ */
+ public RequestData(@NonNull RequestDataParams params) {
+ params.validate();
+
+ this.params = params;
+ }
+
+ /**
+ * Starts the publishing process, registering any listeners or timeout handlers, and
+ * adding the request to the publisher queue. This should not be invoked until after
+ * {@link #configure(MessageData)} is invoked.
+ */
+ public void startPublishing() {
+
+ synchronized (params.getModifyLock()) {
+ if (!svcmgr.isAlive()) {
+ svcmgr.start();
+ }
+ }
+ }
+
+ /**
+ * Unregisters the listener and cancels the timer.
+ */
+ protected void stopPublishing() {
+ if (svcmgr.isAlive()) {
+ svcmgr.stop();
+ }
+ }
+
+ /**
+ * Configures the fields based on the {@link #message} type.
+ *
+ * @param newWrapper the new message wrapper
+ */
+ protected void configure(MessageData newWrapper) {
+
+ wrapper = newWrapper;
+
+ resetRetryCount();
+
+ TimerManager timerManager = wrapper.getTimers();
+ String msgType = wrapper.getType();
+ String reqid = wrapper.getMessage().getRequestId();
+
+ /*
+ * We have to configure the service manager HERE, because it's name changes if the
+ * message class changes.
+ */
+
+ // @formatter:off
+ this.svcmgr = new ServiceManager(name + " " + msgType)
+ .addAction("listener",
+ () -> params.getResponseDispatcher().register(reqid, this::processResponse),
+ () -> params.getResponseDispatcher().unregister(reqid))
+ .addAction("timer",
+ () -> timer = timerManager.register(name, this::handleTimeout),
+ () -> timer.cancel())
+ .addAction("enqueue",
+ () -> enqueue(),
+ () -> {
+ // nothing to "stop"
+ });
+ // @formatter:on
+ }
+
+ /**
+ * Enqueues the current message with the publisher, putting it into the queue token,
+ * if possible. Otherwise, it adds a new token to the queue.
+ */
+ private void enqueue() {
+ PdpMessage message = wrapper.getMessage();
+ if (token != null && token.replaceItem(message) != null) {
+ // took the other's place in the queue - continue using the token
+ return;
+ }
+
+ // couldn't take the other's place - add our own token to the queue
+ token = new QueueToken<>(message);
+ params.getPublisher().enqueue(token);
+ }
+
+ /**
+ * Resets the retry count.
+ */
+ protected void resetRetryCount() {
+ retryCount = 0;
+ }
+
+ /**
+ * Bumps the retry count.
+ *
+ * @return {@code true} if successful, {@code false} if the limit has been reached
+ */
+ protected boolean bumpRetryCount() {
+ if (retryCount >= wrapper.getMaxRetryCount()) {
+ return false;
+ }
+
+ retryCount++;
+ return true;
+ }
+
+ /**
+ * Indicates that the retry count was exhausted. The default method simply invokes
+ * {@link #allCompleted()}.
+ */
+ protected void retryCountExhausted() {
+ // remove this request data from the PDP request map
+ allCompleted();
+ }
+
+ /**
+ * Processes a response received from the PDP.
+ *
+ * @param infra infrastructure on which the response was received
+ * @param topic topic on which the response was received
+ * @param response the response
+ */
+ private void processResponse(CommInfrastructure infra, String topic, PdpStatus response) {
+
+ synchronized (params.getModifyLock()) {
+ if (!svcmgr.isAlive()) {
+ // this particular request must have been discarded
+ return;
+ }
+
+ stopPublishing();
+
+ if (!isActive()) {
+ return;
+ }
+
+ String reason = wrapper.checkResponse(response);
+ if (reason != null) {
+ logger.info("{} PDP data mismatch: {}", getName(), reason);
+ wrapper.mismatch(reason);
+
+ } else {
+ logger.info("{} {} successful", getName(), wrapper.getType());
+ wrapper.completed();
+ }
+ }
+ }
+
+ /**
+ * Handles a timeout.
+ *
+ * @param timerName the timer timer
+ */
+ private void handleTimeout(String timerName) {
+
+ synchronized (params.getModifyLock()) {
+ if (!svcmgr.isAlive()) {
+ // this particular request must have been discarded
+ return;
+ }
+
+ stopPublishing();
+
+ if (!isActive()) {
+ return;
+ }
+
+ if (isInQueue()) {
+ // haven't published yet - just leave it in the queue and reset counts
+ logger.info("{} timeout - request still in the queue", getName());
+ resetRetryCount();
+ startPublishing();
+ return;
+ }
+
+ if (!bumpRetryCount()) {
+ logger.info("{} timeout - retry count exhausted", getName());
+ retryCountExhausted();
+ return;
+ }
+
+ // re-publish
+ logger.info("{} timeout - re-publish", getName());
+ startPublishing();
+ }
+ }
+
+ /**
+ * Determines if the current message is still in the queue. Assumes that
+ * {@link #startPublishing()} has been invoked and thus {@link #token} has been
+ * initialized.
+ *
+ * @return {@code true} if the current message is in the queue, {@code false}
+ * otherwise
+ */
+ private boolean isInQueue() {
+ return (token.get() == wrapper.getMessage());
+ }
+
+ /**
+ * Determines if this request data is still active.
+ *
+ * @return {@code true} if this request is active, {@code false} otherwise
+ */
+ protected abstract boolean isActive();
+
+ /**
+ * Indicates that this entire request has completed.
+ */
+ protected abstract void allCompleted();
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java b/main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java
new file mode 100644
index 00000000..9748b0b5
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java
@@ -0,0 +1,310 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manager of timers. All of the timers for a given manager have the same wait time, which
+ * makes it possible to use a linked hash map to track the timers. As a result, timers can
+ * be quickly added and removed. In addition, the expiration time of any new timer is
+ * always greater than or equal to the timers that are already in the map. Consequently,
+ * the map's iterator will go in ascending order from the minimum expiration time to
+ * maximum expiration time.
+ *
+ * <p>This class has not been tested for multiple threads invoking {@link #run()}
+ * simultaneously.
+ */
+public class TimerManager implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(TimerManager.class);
+
+ /**
+ * Name of this manager, used for logging purposes.
+ */
+ private final String name;
+
+ /**
+ * Time that each new timer should wait.
+ */
+ private final long waitTimeMs;
+
+ /**
+ * When the map is empty, the timer thread will block waiting for this semaphore. When
+ * a new timer is added to the map, the semaphore will be released, thus allowing the
+ * timer thread to progress.
+ */
+ private final Semaphore sem = new Semaphore(0);
+
+ /**
+ * This is decremented to indicate that this manager should be stopped.
+ */
+ private final CountDownLatch stopper = new CountDownLatch(1);
+
+ /**
+ * Used to lock updates to the map.
+ */
+ private final Object lockit = new Object();
+
+ /**
+ * Maps a timer name to a timer.
+ */
+ private final Map<String, Timer> name2timer = new LinkedHashMap<>();
+
+ /**
+ * Constructs the object.
+ *
+ * @param name name of this manager, used for logging purposes
+ * @param waitTimeMs time that each new timer should wait
+ */
+ public TimerManager(String name, long waitTimeMs) {
+ this.name = name;
+ this.waitTimeMs = waitTimeMs;
+ }
+
+ /**
+ * Stops the timer thread.
+ */
+ public void stop() {
+ logger.info("timer manager {} stopping", name);
+
+ // Note: Must decrement the latch BEFORE releasing the semaphore
+ stopper.countDown();
+ sem.release();
+ }
+
+ /**
+ * Registers a timer with the given name. When the timer expires, it is automatically
+ * unregistered and then executed.
+ *
+ * @param timerName name of the timer to register
+ * @param action action to take when the timer expires; the "timerName" is passed as
+ * the only argument
+ * @return the timer
+ */
+ public Timer register(String timerName, Consumer<String> action) {
+
+ synchronized (lockit) {
+ Timer timer = new Timer(timerName, action);
+
+ // always remove existing entry so that new entry goes at the end of the map
+ name2timer.remove(timerName);
+ name2timer.put(timerName, timer);
+
+ logger.info("{} timer registered {}", name, timer);
+
+ if (name2timer.size() == 1) {
+ // release the timer thread
+ sem.release();
+ }
+
+ return timer;
+ }
+ }
+
+ /**
+ * Continuously processes timers until {@link #stop()} is invoked.
+ */
+ @Override
+ public void run() {
+ logger.info("timer manager {} started", name);
+
+ while (stopper.getCount() > 0) {
+
+ try {
+ sem.acquire();
+ sem.drainPermits();
+
+ processTimers();
+
+ } catch (InterruptedException e) {
+ logger.warn("timer manager {} stopping due to interrupt", name);
+ stopper.countDown();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ logger.info("timer manager {} stopped", name);
+ }
+
+ /**
+ * Process all timers, continuously, as long as a timer remains in the map (and
+ * {@link #stop()} has not been called).
+ *
+ * @throws InterruptedException if the thread is interrupted
+ */
+ private void processTimers() throws InterruptedException {
+ Timer timer;
+ while ((timer = getNextTimer()) != null && stopper.getCount() > 0) {
+ processTimer(timer);
+ }
+ }
+
+ /**
+ * Gets the timer that will expire first.
+ *
+ * @return the timer that will expire first, or {@code null} if there are no timers
+ */
+ private Timer getNextTimer() {
+
+ synchronized (lockit) {
+ if (name2timer.isEmpty()) {
+ return null;
+ }
+
+ // use an iterator to get the first timer in the map
+ return name2timer.values().iterator().next();
+ }
+ }
+
+ /**
+ * Process a timer, waiting until it expires, unregistering it, and then executing its
+ * action.
+ *
+ * @param timer timer to process
+ * @throws InterruptedException if the thread is interrupted
+ */
+ private void processTimer(Timer timer) throws InterruptedException {
+ timer.await();
+
+ if (stopper.getCount() == 0) {
+ // stop() was called
+ return;
+ }
+
+ if (!timer.cancel()) {
+ // timer was cancelled while we were waiting
+ return;
+ }
+
+
+ // run the timer
+ try {
+ logger.info("{} timer expired {}", TimerManager.this.name, timer);
+ timer.runner.accept(timer.name);
+ } catch (RuntimeException e) {
+ logger.warn("{} timer threw an exception {}", TimerManager.this.name, timer, e);
+ }
+ }
+
+ /**
+ * Timer info.
+ */
+ public class Timer {
+ /**
+ * The timer's name.
+ */
+ private String name;
+
+ /**
+ * Time, in milliseconds, when the timer will expire.
+ */
+ private long expireMs;
+
+ /**
+ * Action to take when the timer expires.
+ */
+ private Consumer<String> runner;
+
+
+ private Timer(String name, Consumer<String> runner2) {
+ this.name = name;
+ this.expireMs = waitTimeMs + currentTimeMillis();
+ this.runner = runner2;
+ }
+
+ private void await() throws InterruptedException {
+ // wait for it to expire, if necessary
+ long tleft = expireMs - currentTimeMillis();
+ if (tleft > 0) {
+ logger.info("{} timer waiting {}ms {}", TimerManager.this.name, tleft, this);
+ sleep(tleft);
+ }
+ }
+
+ /**
+ * Cancels the timer.
+ *
+ * @return {@code true} if the timer was cancelled, {@code false} if the timer was
+ * not running
+ */
+ public boolean cancel() {
+
+ AtomicBoolean wasPresent = new AtomicBoolean(false);
+
+ synchronized (lockit) {
+
+ name2timer.computeIfPresent(name, (key, val) -> {
+
+ if (val == this) {
+ wasPresent.set(true);
+ return null;
+
+ } else {
+ return val;
+ }
+ });
+
+ if (!wasPresent.get()) {
+ // have a new timer in the map - ignore "this" timer
+ logger.info("{} timer replaced {}", TimerManager.this.name, this);
+ return false;
+ }
+
+ logger.debug("{} timer cancelled {}", TimerManager.this.name, this);
+ return true;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Timer [name=" + name + ", expireMs=" + expireMs + "]";
+ }
+ }
+
+ // these may be overridden by junit tests
+
+ /**
+ * Gets the current time, in milli-seconds.
+ *
+ * @return the current time, in milli-seconds
+ */
+ protected long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+
+ /**
+ * "Sleeps" for a bit, stopping if {@link #stop()} is invoked.
+ *
+ * @param timeMs time, in milli-seconds, to sleep
+ * @throws InterruptedException if this thread is interrupted while sleeping
+ */
+ protected void sleep(long timeMs) throws InterruptedException {
+ stopper.await(timeMs, TimeUnit.MILLISECONDS);
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java
new file mode 100644
index 00000000..aa288f7d
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java
@@ -0,0 +1,104 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.TimerManager;
+
+
+/**
+ * Wraps a message, providing methods appropriate to the message type.
+ */
+public abstract class MessageData {
+ private final PdpMessage message;
+ private final int maxRetries;
+ private final TimerManager timers;
+
+ /**
+ * Constructs the object.
+ *
+ * @param message message to be wrapped by this
+ * @param maxRetries max number of retries
+ * @param timers the timer manager for messages of this type
+ */
+ public MessageData(PdpMessage message, int maxRetries, TimerManager timers) {
+ this.message = message;
+ this.maxRetries = maxRetries;
+ this.timers = timers;
+ }
+
+ /**
+ * Gets the wrapped message.
+ *
+ * @return the wrapped message
+ */
+ public PdpMessage getMessage() {
+ return message;
+ }
+
+ /**
+ * Gets a string, suitable for logging, identifying the message type.
+ *
+ * @return the message type
+ */
+ public String getType() {
+ return message.getClass().getSimpleName();
+ }
+
+ /**
+ * Gets the maximum retry count for the particular message type.
+ *
+ * @return the maximum retry count
+ */
+ public int getMaxRetryCount() {
+ return maxRetries;
+ }
+
+ /**
+ * Gets the timer manager for the particular message type.
+ *
+ * @return the timer manager
+ */
+ public TimerManager getTimers() {
+ return timers;
+ }
+
+ /**
+ * Indicates that the response did not match what was expected.
+ *
+ * @param reason the reason for the mismatch
+ */
+ public abstract void mismatch(String reason);
+
+ /**
+ * Indicates that processing of this particular message has completed successfully.
+ */
+ public abstract void completed();
+
+ /**
+ * Checks the response to ensure it is as expected.
+ *
+ * @param response the response to check
+ * @return an error message, if a fatal error has occurred, {@code null} otherwise
+ */
+ public abstract String checkResponse(PdpStatus response);
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java
new file mode 100644
index 00000000..ecbf5dfa
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+
+/**
+ * Wraps a STATE-CHANGE.
+ */
+public abstract class StateChangeData extends MessageData {
+ private PdpStateChange stateChange;
+
+ /**
+ * Constructs the object.
+ *
+ * @param message message to be wrapped by this
+ * @param params the parameters
+ */
+ public StateChangeData(PdpStateChange message, PdpModifyRequestMapParams params) {
+ super(message, params.getParams().getStateChangeParameters().getMaxRetryCount(), params.getStateChangeTimers());
+
+ stateChange = message;
+ }
+
+ @Override
+ public String checkResponse(PdpStatus response) {
+ if (!stateChange.getName().equals(response.getName())) {
+ return "name does not match";
+ }
+
+ if (response.getState() != stateChange.getState()) {
+ return "state is " + response.getState() + ", but expected " + stateChange.getState();
+ }
+
+ return null;
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java
new file mode 100644
index 00000000..904c8237
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+
+
+/**
+ * Wraps an UPDATE.
+ */
+public abstract class UpdateData extends MessageData {
+ private PdpUpdate update;
+
+ /**
+ * Constructs the object.
+ *
+ * @param message message to be wrapped by this
+ * @param params the parameters
+ */
+ public UpdateData(PdpUpdate message, PdpModifyRequestMapParams params) {
+ super(message, params.getParams().getUpdateParameters().getMaxRetryCount(), params.getUpdateTimers());
+
+ update = message;
+ }
+
+ @Override
+ public String checkResponse(PdpStatus response) {
+ if (!update.getName().equals(response.getName())) {
+ return "name does not match";
+ }
+
+ if (!update.getPdpGroup().equals(response.getPdpGroup())) {
+ return "group does not match";
+ }
+
+ if (!update.getPdpSubgroup().equals(response.getPdpSubgroup())) {
+ return "subgroup does not match";
+ }
+
+ // see if the other has any policies that this does not have
+ ArrayList<ToscaPolicy> lst = new ArrayList<>(response.getPolicies());
+ List<ToscaPolicy> mypolicies = update.getPolicies();
+
+ if (mypolicies.size() != lst.size()) {
+ return "policies do not match";
+ }
+
+ lst.removeAll(update.getPolicies());
+ if (!lst.isEmpty()) {
+ return "policies do not match";
+ }
+
+ return null;
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java
new file mode 100644
index 00000000..2c17a0b2
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java
@@ -0,0 +1,88 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.parameters;
+
+import lombok.Getter;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.Publisher;
+import org.onap.policy.pap.main.comm.TimerManager;
+
+
+/**
+ * Parameters needed to create a {@link PdpModifyRequestMapParams}.
+ */
+@Getter
+public class PdpModifyRequestMapParams extends RequestDataParams {
+ private PdpParameters params;
+ private TimerManager updateTimers;
+ private TimerManager stateChangeTimers;
+
+ public PdpModifyRequestMapParams setParams(PdpParameters params) {
+ this.params = params;
+ return this;
+ }
+
+ public PdpModifyRequestMapParams setUpdateTimers(TimerManager updateTimers) {
+ this.updateTimers = updateTimers;
+ return this;
+ }
+
+ public PdpModifyRequestMapParams setStateChangeTimers(TimerManager stateChangeTimers) {
+ this.stateChangeTimers = stateChangeTimers;
+ return this;
+ }
+
+ @Override
+ public PdpModifyRequestMapParams setPublisher(Publisher publisher) {
+ super.setPublisher(publisher);
+ return this;
+ }
+
+ @Override
+ public PdpModifyRequestMapParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) {
+ super.setResponseDispatcher(responseDispatcher);
+ return this;
+ }
+
+ @Override
+ public PdpModifyRequestMapParams setModifyLock(Object modifyLock) {
+ super.setModifyLock(modifyLock);
+ return this;
+ }
+
+ @Override
+ public void validate() {
+ super.validate();
+
+ if (params == null) {
+ throw new IllegalArgumentException("missing PDP parameters");
+ }
+
+ if (updateTimers == null) {
+ throw new IllegalArgumentException("missing updateTimers");
+ }
+
+ if (stateChangeTimers == null) {
+ throw new IllegalArgumentException("missing stateChangeTimers");
+ }
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java
new file mode 100644
index 00000000..ea4b02ca
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java
@@ -0,0 +1,70 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.parameters;
+
+import lombok.Getter;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.Publisher;
+import org.onap.policy.pap.main.comm.RequestData;
+
+
+/**
+ * Parameters needed to create a {@link RequestData}.
+ */
+@Getter
+public class RequestDataParams {
+ private Publisher publisher;
+ private RequestIdDispatcher<PdpStatus> responseDispatcher;
+ private Object modifyLock;
+
+ public RequestDataParams setPublisher(Publisher publisher) {
+ this.publisher = publisher;
+ return this;
+ }
+
+ public RequestDataParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) {
+ this.responseDispatcher = responseDispatcher;
+ return this;
+ }
+
+ public RequestDataParams setModifyLock(Object modifyLock) {
+ this.modifyLock = modifyLock;
+ return this;
+ }
+
+ /**
+ * Validates the parameters.
+ */
+ public void validate() {
+ if (publisher == null) {
+ throw new IllegalArgumentException("missing publisher");
+ }
+
+ if (responseDispatcher == null) {
+ throw new IllegalArgumentException("missing responseDispatcher");
+ }
+
+ if (modifyLock == null) {
+ throw new IllegalArgumentException("missing modifyLock");
+ }
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
index 913e661d..ed880aeb 100644
--- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
+++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
@@ -23,6 +23,7 @@ package org.onap.policy.pap.main.startstop;
import java.util.Arrays;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
@@ -34,7 +35,12 @@ import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.models.pdp.enums.PdpMessageType;
import org.onap.policy.pap.main.PapConstants;
import org.onap.policy.pap.main.PolicyPapRuntimeException;
+import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
+import org.onap.policy.pap.main.comm.Publisher;
+import org.onap.policy.pap.main.comm.TimerManager;
import org.onap.policy.pap.main.parameters.PapParameterGroup;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.PdpParameters;
import org.onap.policy.pap.main.rest.PapRestServer;
import org.onap.policy.pap.main.rest.PapStatisticsManager;
@@ -81,8 +87,6 @@ public class PapActivator extends ServiceManagerContainer {
try {
this.papParameterGroup = papParameterGroup;
- papParameterGroup.getRestServerParameters().setName(papParameterGroup.getName());
-
this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES);
@@ -90,18 +94,26 @@ public class PapActivator extends ServiceManagerContainer {
throw new PolicyPapRuntimeException(e);
}
- this.msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher);
+ papParameterGroup.getRestServerParameters().setName(papParameterGroup.getName());
final Object pdpUpdateLock = new Object();
+ PdpParameters pdpParams = papParameterGroup.getPdpParameters();
+ AtomicReference<Publisher> pdpPub = new AtomicReference<>();
+ AtomicReference<TimerManager> pdpUpdTimers = new AtomicReference<>();
+ AtomicReference<TimerManager> pdpStChgTimers = new AtomicReference<>();
// @formatter:off
addAction("PAP parameters",
() -> ParameterService.register(papParameterGroup),
() -> ParameterService.deregister(papParameterGroup.getName()));
- addAction("dispatcher",
- () -> registerDispatcher(),
- () -> unregisterDispatcher());
+ addAction("Request ID Dispatcher",
+ () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher),
+ () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name()));
+
+ addAction("Message Dispatcher",
+ () -> registerMsgDispatcher(),
+ () -> unregisterMsgDispatcher());
addAction("topics",
() -> TopicEndpoint.manager.start(),
@@ -111,21 +123,69 @@ public class PapActivator extends ServiceManagerContainer {
() -> Registry.register(PapConstants.REG_STATISTICS_MANAGER, new PapStatisticsManager()),
() -> Registry.unregister(PapConstants.REG_STATISTICS_MANAGER));
+ addAction("PDP publisher",
+ () -> {
+ pdpPub.set(new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP));
+ startThread(pdpPub.get());
+ },
+ () -> pdpPub.get().stop());
+
+ addAction("PDP update timers",
+ () -> {
+ pdpUpdTimers.set(new TimerManager("update", pdpParams.getUpdateParameters().getMaxWaitMs()));
+ startThread(pdpUpdTimers.get());
+ },
+ () -> pdpUpdTimers.get().stop());
+
+ addAction("PDP state-change timers",
+ () -> {
+ pdpStChgTimers.set(new TimerManager("state-change", pdpParams.getUpdateParameters().getMaxWaitMs()));
+ startThread(pdpStChgTimers.get());
+ },
+ () -> pdpStChgTimers.get().stop());
+
addAction("PDP modification lock",
() -> Registry.register(PapConstants.REG_PDP_MODIFY_LOCK, pdpUpdateLock),
() -> Registry.unregister(PapConstants.REG_PDP_MODIFY_LOCK));
- addAction("REST server",
- () -> restServer = new PapRestServer(papParameterGroup.getRestServerParameters()),
- () -> { });
+ addAction("PDP modification requests",
+ () -> Registry.register(PapConstants.REG_PDP_MODIFY_MAP, new PdpModifyRequestMap(
+ new PdpModifyRequestMapParams()
+ .setModifyLock(pdpUpdateLock)
+ .setParams(pdpParams)
+ .setPublisher(pdpPub.get())
+ .setResponseDispatcher(reqIdDispatcher)
+ .setStateChangeTimers(pdpStChgTimers.get())
+ .setUpdateTimers(pdpUpdTimers.get()))),
+ () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_MAP));
+
+ addAction("Create REST server",
+ () -> {
+ restServer = new PapRestServer(papParameterGroup.getRestServerParameters());
+ },
+ () -> {
+ restServer = null;
+ });
- addAction("REST server thread",
+ addAction("REST server",
() -> restServer.start(),
() -> restServer.stop());
// @formatter:on
}
/**
+ * Starts a background thread.
+ *
+ * @param runner function to run in the background
+ */
+ private void startThread(Runnable runner) {
+ Thread thread = new Thread(runner);
+ thread.setDaemon(true);
+
+ thread.start();
+ }
+
+ /**
* Get the parameters used by the activator.
*
* @return the parameters of the activator
@@ -137,7 +197,7 @@ public class PapActivator extends ServiceManagerContainer {
/**
* Registers the dispatcher with the topic source(s).
*/
- private void registerDispatcher() {
+ private void registerMsgDispatcher() {
for (TopicSource source : TopicEndpoint.manager
.getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) {
source.register(msgDispatcher);
@@ -147,7 +207,7 @@ public class PapActivator extends ServiceManagerContainer {
/**
* Unregisters the dispatcher from the topic source(s).
*/
- private void unregisterDispatcher() {
+ private void unregisterMsgDispatcher() {
for (TopicSource source : TopicEndpoint.manager
.getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) {
source.unregister(msgDispatcher);
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/PdpModifyRequestMapTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/PdpModifyRequestMapTest.java
new file mode 100644
index 00000000..bbe75a44
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/PdpModifyRequestMapTest.java
@@ -0,0 +1,575 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.common.endpoints.listeners.TypedMessageListener;
+import org.onap.policy.models.base.PfConceptKey;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.models.pdp.enums.PdpState;
+import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy;
+import org.onap.policy.pap.main.PapConstants;
+import org.onap.policy.pap.main.comm.PdpModifyRequestMap.ModifyReqData;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.PdpParameters;
+import org.onap.policy.pap.main.parameters.PdpStateChangeParameters;
+import org.onap.policy.pap.main.parameters.PdpUpdateParameters;
+import org.powermock.reflect.Whitebox;
+
+public class PdpModifyRequestMapTest {
+ private static final String DIFFERENT = "-diff";
+ private static final String PDP1 = "pdp_1";
+
+ private static final int UPDATE_RETRIES = 2;
+ private static final int STATE_RETRIES = 1;
+
+ private PdpModifyRequestMap map;
+ private Publisher pub;
+ private RequestIdDispatcher<PdpStatus> disp;
+ private Object lock;
+ private TimerManager updTimers;
+ private TimerManager stateTimers;
+ private TimerManager.Timer timer;
+ private Queue<QueueToken<PdpMessage>> queue;
+ private PdpStatus response;
+ private PdpParameters pdpParams;
+ private PdpUpdateParameters updParams;
+ private PdpStateChangeParameters stateParams;
+ private PdpUpdate update;
+ private PdpStateChange state;
+ private String mismatchReason;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() {
+ pub = mock(Publisher.class);
+ disp = mock(RequestIdDispatcher.class);
+ lock = new Object();
+ updTimers = mock(TimerManager.class);
+ stateTimers = mock(TimerManager.class);
+ timer = mock(TimerManager.Timer.class);
+ queue = new LinkedList<>();
+ response = new PdpStatus();
+ pdpParams = mock(PdpParameters.class);
+ updParams = mock(PdpUpdateParameters.class);
+ stateParams = mock(PdpStateChangeParameters.class);
+ update = makeUpdate();
+ state = makeStateChange();
+ mismatchReason = null;
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ queue.add(invocation.getArgumentAt(0, QueueToken.class));
+ return null;
+ }
+ }).when(pub).enqueue(any());
+
+ when(updTimers.register(any(), any())).thenReturn(timer);
+ when(stateTimers.register(any(), any())).thenReturn(timer);
+
+ when(pdpParams.getUpdateParameters()).thenReturn(updParams);
+ when(pdpParams.getStateChangeParameters()).thenReturn(stateParams);
+
+ when(updParams.getMaxRetryCount()).thenReturn(UPDATE_RETRIES);
+ when(updParams.getMaxWaitMs()).thenReturn(1000L);
+
+ when(stateParams.getMaxRetryCount()).thenReturn(STATE_RETRIES);
+ when(stateParams.getMaxWaitMs()).thenReturn(1000L);
+
+ response.setName(PDP1);
+ response.setState(PdpState.SAFE);
+ response.setPdpGroup(update.getPdpGroup());
+ response.setPdpSubgroup(update.getPdpSubgroup());
+ response.setPolicies(update.getPolicies());
+
+ map = new PdpModifyRequestMap(makeParameters()) {
+
+ @Override
+ protected ModifyReqData makeRequestData(PdpUpdate update, PdpStateChange stateChange) {
+ return new ModifyReqData(update, stateChange) {
+ @Override
+ protected void mismatch(String reason) {
+ mismatchReason = reason;
+ super.mismatch(reason);
+ }
+ };
+ }
+ };
+
+ map = spy(map);
+ }
+
+ @Test
+ public void testAdd_DifferentPdps() {
+ map.addRequest(update);
+
+ state.setName(DIFFERENT);
+ map.addRequest(state);
+
+ assertNotNull(getReqData(PDP1));
+ assertNotNull(getReqData(DIFFERENT));
+
+ assertQueueContains("testAdd_DifferentPdps", update, state);
+ }
+
+ @Test
+ public void testAddRequestPdpUpdate() {
+ map.addRequest(update);
+
+ assertQueueContains("testAddRequestPdpUpdate", update);
+ }
+
+ @Test
+ public void testAddRequestPdpStateChange() {
+ map.addRequest(state);
+
+ assertQueueContains("testAddRequestPdpStateChange", state);
+ }
+
+ @Test
+ public void testAddRequestPdpUpdatePdpStateChange_Both() {
+ map.addRequest(update, state);
+
+ assertQueueContains("testAddRequestPdpUpdatePdpStateChange_Both", update);
+ }
+
+ @Test
+ public void testAddRequestPdpUpdatePdpStateChange_BothNull() {
+ map.addRequest(null, null);
+
+ // nothing should have been added to the queue
+ assertTrue(queue.isEmpty());
+ }
+
+ @Test
+ public void testGetPdpName_SameNames() {
+ // should be no exception
+ map.addRequest(update, state);
+ }
+
+ @Test
+ public void testGetPdpName_DifferentNames() {
+ // should be no exception
+ state.setName(update.getName() + "X");
+ assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state))
+ .withMessageContaining("does not match");
+ }
+
+ @Test
+ public void testGetPdpName_NullUpdateName() {
+ update.setName(null);
+ assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update)).withMessageContaining("update");
+
+ assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state))
+ .withMessageContaining("update");
+
+ // both names are null
+ state.setName(null);
+ assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state));
+ }
+
+ @Test
+ public void testGetPdpName_NullStateName() {
+ state.setName(null);
+ assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(state)).withMessageContaining("state");
+
+ assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state))
+ .withMessageContaining("state");
+
+ // both names are null
+ update.setName(null);
+ assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state));
+ }
+
+ @Test
+ public void testIsSamePdpUpdatePdpUpdate() {
+ map.addRequest(update);
+
+ // queue a similar request
+ PdpUpdate update2 = makeUpdate();
+ map.addRequest(update2);
+
+ // token should still have original message
+ assertQueueContains("testIsSamePdpUpdatePdpUpdate", update);
+ }
+
+ @Test
+ public void testIsSamePdpUpdatePdpUpdate_DifferentPolicyCount() {
+ map.addRequest(update);
+
+ PdpUpdate update2 = makeUpdate();
+ update2.setPolicies(Arrays.asList(update.getPolicies().get(0)));
+ map.addRequest(update2);
+
+ // should have replaced the message in the token
+ assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentPolicyCount", update2);
+ }
+
+ @Test
+ public void testIsSamePdpUpdatePdpUpdate_DifferentGroup() {
+ map.addRequest(update);
+
+ // queue a similar request
+ PdpUpdate update2 = makeUpdate();
+ update2.setPdpGroup(update.getPdpGroup() + DIFFERENT);
+ map.addRequest(update2);
+
+ // should have replaced the message in the token
+ assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentGroup", update2);
+ }
+
+ @Test
+ public void testIsSamePdpUpdatePdpUpdate_DifferentSubGroup() {
+ map.addRequest(update);
+
+ PdpUpdate update2 = makeUpdate();
+ update2.setPdpSubgroup(update.getPdpSubgroup() + DIFFERENT);
+ map.addRequest(update2);
+
+ // should have replaced the message in the token
+ assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentSubGroup", update2);
+ }
+
+ @Test
+ public void testIsSamePdpUpdatePdpUpdate_DifferentPolicies() {
+ map.addRequest(update);
+
+ ArrayList<ToscaPolicy> policies = new ArrayList<>(update.getPolicies());
+ policies.set(0, new ToscaPolicy(new PfConceptKey("policy-3-x", "2.0.0")));
+
+ PdpUpdate update2 = makeUpdate();
+ update2.setPolicies(policies);
+ map.addRequest(update2);
+
+ // should have replaced the message in the token
+ assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentPolicies", update2);
+ }
+
+ @Test
+ public void testIsSamePdpStateChangePdpStateChange() {
+ map.addRequest(state);
+
+ // queue a similar request
+ PdpStateChange state2 = makeStateChange();
+ map.addRequest(state2);
+
+ // token should still have original message
+ assertQueueContains("testIsSamePdpStateChangePdpStateChange", state);
+ }
+
+ @Test
+ public void testIsSamePdpStateChangePdpStateChange_DifferentState() {
+ map.addRequest(state);
+
+ // queue a similar request
+ PdpStateChange state2 = makeStateChange();
+ state2.setState(PdpState.TERMINATED);
+ map.addRequest(state2);
+
+ // should have replaced the message in the token
+ assertQueueContains("testIsSamePdpStateChangePdpStateChange_DifferentState", state2);
+ }
+
+ @Test
+ public void testModifyReqDataIsActive() {
+ map.addRequest(update);
+
+ invokeProcessResponse();
+
+ // name should have been removed
+ assertNull(getReqData(PDP1));
+ }
+
+ @Test
+ public void testModifyReqDataAddPdpUpdate() {
+ map.addRequest(state);
+
+ map.addRequest(update);
+
+ // update should have replaced the state-change in the queue
+ assertQueueContains("testModifyReqDataAddPdpUpdate", update);
+ }
+
+ @Test
+ public void testModifyReqDataAddPdpStateChange() {
+ map.addRequest(update);
+
+ map.addRequest(state);
+
+ // update should still be in the queue
+ assertQueueContains("testModifyReqDataAddPdpStateChange", update);
+ }
+
+ @Test
+ public void testModifyReqDataRetryCountExhausted() {
+ map.addRequest(state);
+
+ // timeout twice so that retry count is exhausted
+ invokeTimeoutHandler(stateTimers, STATE_RETRIES + 1);
+
+ // name should have been removed
+ assertNull(getReqData(PDP1));
+ }
+
+ @Test
+ public void testModifyReqDataMismatch() {
+ map.addRequest(state);
+
+ // set up a response with incorrect info
+ response.setName(state.getName() + DIFFERENT);
+
+ invokeProcessResponse();
+
+ assertNotNull(mismatchReason);
+
+ // name should have been removed
+ assertNull(getReqData(PDP1));
+ }
+
+ @Test
+ public void testUpdateDataGetMaxRetryCount() {
+ map.addRequest(update);
+ ModifyReqData reqdata = getReqData(PDP1);
+
+ for (int count = 0; count < UPDATE_RETRIES; ++count) {
+ assertTrue("update bump " + count, reqdata.bumpRetryCount());
+ }
+
+ assertFalse("update bump final", reqdata.bumpRetryCount());
+ }
+
+ @Test
+ public void testUpdateDataMismatch() {
+ map.addRequest(update);
+
+ response.setName(DIFFERENT);
+ invokeProcessResponse();
+
+ assertNull(getReqData(PDP1));
+ }
+
+ @Test
+ public void testUpdateDataComplete() {
+ map.addRequest(update);
+
+ invokeProcessResponse();
+
+ assertNull(getReqData(PDP1));
+ }
+
+ @Test
+ public void testUpdateDataComplete_MoreToGo() {
+ map.addRequest(update, state);
+
+ invokeProcessResponse();
+
+ assertNotNull(getReqData(PDP1));
+
+ assertSame(state, queue.poll().get());
+ }
+
+ @Test
+ public void testStateChangeDataMismatch() {
+ map.addRequest(state);
+
+ response.setName(DIFFERENT);
+ invokeProcessResponse();
+
+ assertNull(getReqData(PDP1));
+ }
+
+ @Test
+ public void testStateChangeDataCompleted() {
+ map.addRequest(state);
+
+ invokeProcessResponse();
+
+ assertNull(getReqData(PDP1));
+ }
+
+ @Test
+ public void testMakeRequestData() {
+ // need a map that doesn't override the method
+ map = new PdpModifyRequestMap(makeParameters());
+
+ // this will invoke makeRequestData() - should not throw an exception
+ map.addRequest(update);
+
+ assertNotNull(getReqData(PDP1));
+ }
+
+ /**
+ * Asserts that the queue contains the specified messages.
+ *
+ * @param testName the test name
+ * @param messages messages that are expected in the queue
+ */
+ private void assertQueueContains(String testName, PdpMessage... messages) {
+ assertEquals(testName, messages.length, queue.size());
+
+ int count = 0;
+ for (PdpMessage msg : messages) {
+ ++count;
+
+ QueueToken<PdpMessage> token = queue.remove();
+ assertSame(testName + "-" + count, msg, token.get());
+ }
+ }
+
+ /**
+ * Makes parameters to configure a map.
+ *
+ * @return new parameters
+ */
+ private PdpModifyRequestMapParams makeParameters() {
+ return new PdpModifyRequestMapParams().setModifyLock(lock).setParams(pdpParams).setPublisher(pub)
+ .setResponseDispatcher(disp).setStateChangeTimers(stateTimers).setUpdateTimers(updTimers);
+ }
+
+ /**
+ * Gets the listener that was registered with the dispatcher and invokes it.
+ *
+ * @return the response processor
+ */
+ @SuppressWarnings("unchecked")
+ private TypedMessageListener<PdpStatus> invokeProcessResponse() {
+ @SuppressWarnings("rawtypes")
+ ArgumentCaptor<TypedMessageListener> processResp = ArgumentCaptor.forClass(TypedMessageListener.class);
+
+ // indicate that is has been published
+ queue.remove().replaceItem(null);
+
+ verify(disp).register(any(), processResp.capture());
+
+ TypedMessageListener<PdpStatus> func = processResp.getValue();
+ func.onTopicEvent(CommInfrastructure.NOOP, PapConstants.TOPIC_POLICY_PDP_PAP, response);
+
+ return func;
+ }
+
+ /**
+ * Gets the timeout handler that was registered with the timer manager and invokes it.
+ *
+ * @param timers the timer manager whose handler is to be invoked
+ * @param ntimes number of times to invoke the timeout handler
+ * @return the timeout handler
+ */
+ @SuppressWarnings("unchecked")
+ private void invokeTimeoutHandler(TimerManager timers, int ntimes) {
+ @SuppressWarnings("rawtypes")
+ ArgumentCaptor<Consumer> timeoutHdlr = ArgumentCaptor.forClass(Consumer.class);
+
+ for (int count = 1; count <= ntimes; ++count) {
+ // indicate that is has been published
+ queue.remove().replaceItem(null);
+
+ verify(timers, times(count)).register(any(), timeoutHdlr.capture());
+
+ @SuppressWarnings("rawtypes")
+ List<Consumer> lst = timeoutHdlr.getAllValues();
+
+ Consumer<String> hdlr = lst.get(lst.size() - 1);
+ hdlr.accept(PDP1);
+ }
+ }
+
+ /**
+ * Gets the request data from the map.
+ *
+ * @param pdpName name of the PDP whose data is desired
+ * @return the request data, or {@code null} if the PDP is not in the map
+ */
+ private ModifyReqData getReqData(String pdpName) {
+ Map<String, ModifyReqData> name2data = Whitebox.getInternalState(map, "name2data");
+ return name2data.get(pdpName);
+ }
+
+ /**
+ * Makes an update message.
+ *
+ * @return a new update message
+ */
+ private PdpUpdate makeUpdate() {
+ PdpUpdate upd = new PdpUpdate();
+
+ upd.setDescription("update-description");
+ upd.setName(PDP1);
+ upd.setPdpGroup("group1-a");
+ upd.setPdpSubgroup("sub1-a");
+ upd.setPdpType("drools");
+
+ ToscaPolicy policy1 = new ToscaPolicy(new PfConceptKey("policy-1-a", "1.0.0"));
+ ToscaPolicy policy2 = new ToscaPolicy(new PfConceptKey("policy-2-a", "1.1.0"));
+
+ upd.setPolicies(Arrays.asList(policy1, policy2));
+
+ return upd;
+ }
+
+ /**
+ * Makes a state-change message.
+ *
+ * @return a new state-change message
+ */
+ private PdpStateChange makeStateChange() {
+ PdpStateChange cng = new PdpStateChange();
+
+ cng.setName(PDP1);
+ cng.setState(PdpState.SAFE);
+
+ return cng;
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java
new file mode 100644
index 00000000..f15b2a04
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java
@@ -0,0 +1,265 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.resources.ResourceUtils;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.pap.main.PapConstants;
+import org.onap.policy.pap.main.PolicyPapException;
+
+public class PublisherTest extends Threaded {
+
+ // these messages will have different request IDs
+ private static final PdpStateChange MSG1 = new PdpStateChange();
+ private static final PdpStateChange MSG2 = new PdpStateChange();
+
+ // MSG1 & MSG2, respectively, encoded as JSON
+ private static final String JSON1;
+ private static final String JSON2;
+
+ static {
+ try {
+ Coder coder = new StandardCoder();
+ JSON1 = coder.encode(MSG1);
+ JSON2 = coder.encode(MSG2);
+
+ } catch (CoderException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ /**
+ * Max time to wait, in milliseconds, for a thread to terminate or for a message to be
+ * published.
+ */
+ private static final long MAX_WAIT_MS = 5000;
+
+ private Publisher pub;
+ private MyListener listener;
+
+ /**
+ * Configures the topic and attaches a listener.
+ *
+ * @throws Exception if an error occurs
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Properties props = new Properties();
+ File propFile = new File(ResourceUtils.getFilePath4Resource("parameters/topic.properties"));
+ try (FileInputStream inp = new FileInputStream(propFile)) {
+ props.load(inp);
+ }
+
+ TopicEndpoint.manager.shutdown();
+
+ TopicEndpoint.manager.addTopics(props);
+ TopicEndpoint.manager.start();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TopicEndpoint.manager.shutdown();
+ }
+
+ /**
+ * Set up.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ pub = new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP);
+
+ listener = new MyListener();
+ TopicEndpoint.manager.getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).register(listener);
+ }
+
+ /**
+ * Tear down.
+ *
+ * @throws Exception if an error occurs
+ */
+ @After
+ public void tearDown() throws Exception {
+ TopicEndpoint.manager.getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).unregister(listener);
+
+ super.tearDown();
+ }
+
+ @Override
+ protected void stopThread() {
+ if (pub != null) {
+ pub.stop();
+ }
+ }
+
+ @Test
+ public void testPublisher_testStop() throws Exception {
+ startThread(pub);
+ pub.stop();
+
+ assertTrue(waitStop());
+
+ // ensure we can call "stop" a second time
+ pub.stop();
+ }
+
+ @Test
+ public void testPublisher_Ex() throws Exception {
+ assertThatThrownBy(() -> new Publisher("unknwon-topic")).isInstanceOf(PolicyPapException.class);
+ }
+
+ @Test
+ public void testEnqueue() throws Exception {
+ // enqueue before running
+ pub.enqueue(new QueueToken<>(MSG1));
+
+ // enqueue another after running
+ startThread(pub);
+ pub.enqueue(new QueueToken<>(MSG2));
+
+ String json = listener.await(MAX_WAIT_MS);
+ assertEquals(JSON1, json);
+
+ json = listener.await(MAX_WAIT_MS);
+ assertEquals(JSON2, json);
+ }
+
+ @Test
+ public void testRun_StopBeforeProcess() throws Exception {
+ // enqueue before running
+ QueueToken<PdpMessage> token = new QueueToken<>(MSG1);
+ pub.enqueue(token);
+
+ // stop before running
+ pub.stop();
+
+ // start the thread and then wait for it to stop
+ startThread(pub);
+ assertTrue(waitStop());
+
+ // message should not have been processed
+ assertTrue(listener.isEmpty());
+ assertNotNull(token.get());
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ startThread(pub);
+
+ // should skip token with null message
+ QueueToken<PdpMessage> token1 = new QueueToken<>(null);
+ pub.enqueue(token1);
+
+ QueueToken<PdpMessage> token2 = new QueueToken<>(MSG2);
+ pub.enqueue(token2);
+
+ // only the second message should have been processed
+ String json = listener.await(MAX_WAIT_MS);
+ assertEquals(JSON2, json);
+ assertNull(token2.get());
+
+ pub.stop();
+ assertTrue(waitStop());
+
+ // no more messages
+ assertTrue(listener.isEmpty());
+ }
+
+ @Test
+ public void testGetNext() throws Exception {
+ startThread(pub);
+
+ // wait for a message to be processed
+ pub.enqueue(new QueueToken<>(MSG1));
+ assertNotNull(listener.await(MAX_WAIT_MS));
+
+ // now interrupt
+ interruptThread();
+
+ assertTrue(waitStop());
+ }
+
+ /**
+ * Listener for messages published to the topic.
+ */
+ private static class MyListener implements TopicListener {
+
+ /**
+ * Released every time a message is added to the queue.
+ */
+ private final Semaphore sem = new Semaphore(0);
+
+ private final ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<>();
+
+ public boolean isEmpty() {
+ return messages.isEmpty();
+ }
+
+ /**
+ * Waits for a message to be published to the topic.
+ *
+ * @param waitMs time to wait, in milli-seconds
+ * @return the next message in the queue, or {@code null} if there are no messages
+ * or if the timeout was reached
+ * @throws InterruptedException if this thread was interrupted while waiting
+ */
+ public String await(long waitMs) throws InterruptedException {
+ if (sem.tryAcquire(waitMs, TimeUnit.MILLISECONDS)) {
+ return messages.poll();
+ }
+
+ return null;
+ }
+
+ @Override
+ public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
+ messages.add(event);
+ sem.release();
+ }
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/QueueTokenTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/QueueTokenTest.java
new file mode 100644
index 00000000..3ff91edf
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/QueueTokenTest.java
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.junit.Test;
+
+public class QueueTokenTest {
+ private static final String STRING1 = "a string";
+ private static final String STRING2 = "another string";
+
+ private QueueToken<String> token;
+
+ @Test
+ public void test() throws Exception {
+ token = new QueueToken<>(STRING1);
+ assertEquals(STRING1, token.get());
+
+ assertEquals(STRING1, token.replaceItem(STRING2));
+ assertEquals(STRING2, token.get());
+
+ assertEquals(STRING2, token.replaceItem(null));
+ assertEquals(null, token.get());
+
+ assertEquals(null, token.replaceItem(null));
+ assertEquals(null, token.get());
+
+ assertEquals(null, token.replaceItem(STRING1));
+ assertEquals(null, token.get());
+
+ /*
+ * Now do some mult-threaded tests, hopefully causing some contention.
+ */
+
+ token = new QueueToken<>("");
+
+ Set<String> values = ConcurrentHashMap.newKeySet();
+
+ // create and configure the threads
+ Thread[] threads = new Thread[100];
+ for (int x = 0; x < threads.length; ++x) {
+ final int xfinal = x;
+ threads[x] = new Thread(() -> values.add(token.replaceItem("me-" + xfinal)));
+ threads[x].setDaemon(true);
+ }
+
+ // start the threads all at once
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ // wait for the threads to stop
+ for (Thread thread : threads) {
+ thread.join(5000);
+ }
+
+ values.add(token.replaceItem(null));
+
+ for (int x = 0; x < threads.length; ++x) {
+ String msg = "me-" + x;
+ assertTrue(msg, values.contains(msg));
+ }
+ }
+
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/RequestDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/RequestDataTest.java
new file mode 100644
index 00000000..28e5cf96
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/RequestDataTest.java
@@ -0,0 +1,476 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.function.Consumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.common.endpoints.listeners.TypedMessageListener;
+import org.onap.policy.common.utils.services.ServiceManager;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.enums.PdpState;
+import org.onap.policy.pap.main.PapConstants;
+import org.onap.policy.pap.main.comm.msgdata.MessageData;
+import org.onap.policy.pap.main.parameters.RequestDataParams;
+import org.powermock.reflect.Whitebox;
+
+public class RequestDataTest {
+ private static final String PDP1 = "pdp_1";
+ private static final String MY_MSG_TYPE = "my-type";
+
+ private MyRequestData reqdata;
+ private Publisher pub;
+ private RequestIdDispatcher<PdpStatus> disp;
+ private Object lock;
+ private TimerManager timers;
+ private TimerManager.Timer timer;
+ private MyMessageData msgdata;
+ private Queue<QueueToken<PdpMessage>> queue;
+ private PdpStatus response;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() {
+ pub = mock(Publisher.class);
+ disp = mock(RequestIdDispatcher.class);
+ lock = new Object();
+ timers = mock(TimerManager.class);
+ timer = mock(TimerManager.Timer.class);
+ msgdata = new MyMessageData(PDP1);
+ queue = new LinkedList<>();
+ response = new PdpStatus();
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ queue.add(invocation.getArgumentAt(0, QueueToken.class));
+ return null;
+ }
+ }).when(pub).enqueue(any());
+
+ when(timers.register(any(), any())).thenReturn(timer);
+
+ reqdata = new MyRequestData(
+ new RequestDataParams().setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp));
+
+ reqdata.setName(PDP1);
+
+ msgdata = spy(msgdata);
+ reqdata = spy(reqdata);
+ }
+
+ @Test
+ public void testRequestData_Invalid() {
+ // null params
+ assertThatThrownBy(() -> new MyRequestData(null)).isInstanceOf(NullPointerException.class);
+
+ // invalid params
+ assertThatIllegalArgumentException().isThrownBy(() -> new MyRequestData(new RequestDataParams()));
+ }
+
+ @Test
+ public void testStartPublishing() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ verify(disp).register(eq(msgdata.getMessage().getRequestId()), any());
+ verify(timers).register(eq(PDP1), any());
+ verify(pub).enqueue(any());
+
+ QueueToken<PdpMessage> token = queue.poll();
+ assertNotNull(token);
+ assertSame(msgdata.getMessage(), token.get());
+
+
+ // invoking start() again has no effect - invocation counts remain the same
+ reqdata.startPublishing();
+ verify(disp, times(1)).register(eq(msgdata.getMessage().getRequestId()), any());
+ verify(timers, times(1)).register(eq(PDP1), any());
+ verify(pub, times(1)).enqueue(any());
+ }
+
+ @Test
+ public void testStopPublishing() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+ reqdata.stopPublishing();
+
+ verify(disp).unregister(msgdata.getMessage().getRequestId());
+ verify(timer).cancel();
+
+
+ // invoking stop() again has no effect - invocation counts remain the same
+ reqdata.stopPublishing();
+
+ verify(disp, times(1)).unregister(msgdata.getMessage().getRequestId());
+ verify(timer, times(1)).cancel();
+ }
+
+ @Test
+ public void testConfigure() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ verify(disp).register(eq(msgdata.getMessage().getRequestId()), any());
+ verify(timers).register(eq(PDP1), any());
+ verify(pub).enqueue(any());
+
+ ServiceManager svcmgr = Whitebox.getInternalState(reqdata, "svcmgr");
+ assertEquals(PDP1 + " " + MY_MSG_TYPE, svcmgr.getName());
+
+
+ // bump this so we can verify that it is reset by configure()
+ reqdata.bumpRetryCount();
+
+ reqdata.configure(msgdata);
+ assertEquals(0, getRetryCount());
+ }
+
+ @Test
+ public void testEnqueue() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ // replace the message with a new message
+ reqdata.stopPublishing();
+ MyMessageData msgdata2 = new MyMessageData(PDP1);
+ reqdata.configure(msgdata2);
+ reqdata.startPublishing();
+
+ // should still only be one token in the queue
+ QueueToken<PdpMessage> token = queue.poll();
+ assertNull(queue.poll());
+ assertNotNull(token);
+ assertSame(msgdata2.getMessage(), token.get());
+
+ // null out the token
+ token.replaceItem(null);
+
+ // enqueue a new message
+ reqdata.stopPublishing();
+ MyMessageData msgdata3 = new MyMessageData(PDP1);
+ reqdata.configure(msgdata3);
+ reqdata.startPublishing();
+
+ // a new token should have been placed in the queue
+ QueueToken<PdpMessage> token2 = queue.poll();
+ assertTrue(token != token2);
+ assertNull(queue.poll());
+ assertNotNull(token2);
+ assertSame(msgdata3.getMessage(), token2.get());
+ }
+
+ @Test
+ public void testResetRetryCount_testBumpRetryCount() {
+ when(msgdata.getMaxRetryCount()).thenReturn(2);
+
+ reqdata.configure(msgdata);
+
+ assertEquals(0, getRetryCount());
+ assertTrue(reqdata.bumpRetryCount());
+ assertTrue(reqdata.bumpRetryCount());
+
+ // limit should now be reached and it should go no further
+ assertFalse(reqdata.bumpRetryCount());
+ assertFalse(reqdata.bumpRetryCount());
+
+ assertEquals(2, getRetryCount());
+
+ reqdata.resetRetryCount();
+ assertEquals(0, getRetryCount());
+ }
+
+ @Test
+ public void testRetryCountExhausted() {
+ reqdata.configure(msgdata);
+
+ reqdata.retryCountExhausted();
+
+ verify(reqdata).allCompleted();
+ }
+
+ @Test
+ public void testProcessResponse() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ invokeProcessResponse();
+
+ verify(reqdata).stopPublishing();
+ verify(msgdata).checkResponse(response);
+ verify(msgdata).completed();
+ }
+
+ @Test
+ public void testProcessResponse_NotPublishing() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ reqdata.stopPublishing();
+
+ invokeProcessResponse();
+
+ // only invocation should have been the one before calling invokeProcessResponse()
+ verify(reqdata, times(1)).stopPublishing();
+
+ verify(msgdata, never()).checkResponse(response);
+ verify(msgdata, never()).completed();
+ }
+
+ @Test
+ public void testProcessResponse_NotActive() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ when(reqdata.isActive()).thenReturn(false);
+
+ invokeProcessResponse();
+
+ // it should still stop publishing
+ verify(reqdata).stopPublishing();
+
+ verify(msgdata, never()).checkResponse(response);
+ verify(msgdata, never()).completed();
+ }
+
+ @Test
+ public void testProcessResponse_ResponseFailed() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ when(msgdata.checkResponse(response)).thenReturn("failed");
+
+ invokeProcessResponse();
+
+ verify(reqdata).stopPublishing();
+ verify(msgdata).checkResponse(response);
+
+ verify(msgdata, never()).completed();
+ verify(msgdata).mismatch("failed");
+ }
+
+ @Test
+ public void testHandleTimeout() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ // remove it from the queue
+ queue.poll().replaceItem(null);
+
+ invokeTimeoutHandler();
+
+ // count should have been bumped
+ assertEquals(1, getRetryCount());
+
+ // should have invoked startPublishing() a second time
+ verify(reqdata, times(2)).startPublishing();
+ }
+
+ @Test
+ public void testHandleTimeout_NotPublishing() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ reqdata.stopPublishing();
+
+ invokeTimeoutHandler();
+
+ // should NOT have invoked startPublishing() a second time
+ verify(reqdata, times(1)).startPublishing();
+ verify(reqdata, never()).retryCountExhausted();
+ }
+
+ @Test
+ public void testHandleTimeout_NotActive() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ when(reqdata.isActive()).thenReturn(false);
+
+ invokeTimeoutHandler();
+
+ // should NOT have invoked startPublishing() a second time
+ verify(reqdata, times(1)).startPublishing();
+ verify(reqdata, never()).retryCountExhausted();
+ }
+
+ @Test
+ public void testHandleTimeout_StillInQueue() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ reqdata.bumpRetryCount();
+
+ invokeTimeoutHandler();
+
+ // count should reset the count
+ assertEquals(0, getRetryCount());
+
+ // should have invoked startPublishing() a second time
+ verify(reqdata, times(2)).startPublishing();
+ }
+
+ @Test
+ public void testHandleTimeout_RetryExhausted() {
+ reqdata.configure(msgdata);
+ reqdata.startPublishing();
+
+ // exhaust the count
+ reqdata.bumpRetryCount();
+ reqdata.bumpRetryCount();
+ reqdata.bumpRetryCount();
+
+ // remove it from the queue
+ queue.poll().replaceItem(null);
+
+ invokeTimeoutHandler();
+
+ // should NOT have invoked startPublishing() a second time
+ verify(reqdata, times(1)).startPublishing();
+
+ verify(reqdata).retryCountExhausted();
+ }
+
+ @Test
+ public void testGetName_testSetName() {
+ reqdata.setName("abc");
+ assertEquals("abc", reqdata.getName());
+ }
+
+ @Test
+ public void testGetWrapper() {
+ reqdata.configure(msgdata);
+ assertSame(msgdata, reqdata.getWrapper());
+ }
+
+ /**
+ * Gets the retry count from the data.
+ * @return the current retry count
+ */
+ private int getRetryCount() {
+ return Whitebox.getInternalState(reqdata, "retryCount");
+ }
+
+ /**
+ * Gets the listener that was registered with the dispatcher and invokes it.
+ */
+ @SuppressWarnings("unchecked")
+ private void invokeProcessResponse() {
+ @SuppressWarnings("rawtypes")
+ ArgumentCaptor<TypedMessageListener> processResp = ArgumentCaptor.forClass(TypedMessageListener.class);
+
+ verify(disp).register(any(), processResp.capture());
+
+ processResp.getValue().onTopicEvent(CommInfrastructure.NOOP, PapConstants.TOPIC_POLICY_PDP_PAP, response);
+ }
+
+ /**
+ * Gets the timeout handler that was registered with the timer manager and invokes it.
+ */
+ @SuppressWarnings("unchecked")
+ private void invokeTimeoutHandler() {
+ @SuppressWarnings("rawtypes")
+ ArgumentCaptor<Consumer> timeoutHdlr = ArgumentCaptor.forClass(Consumer.class);
+
+ verify(timers).register(any(), timeoutHdlr.capture());
+
+ timeoutHdlr.getValue().accept(PDP1);
+ }
+
+ private class MyRequestData extends RequestData {
+
+ public MyRequestData(RequestDataParams params) {
+ super(params);
+ }
+
+ @Override
+ protected boolean isActive() {
+ return true;
+ }
+
+ @Override
+ protected void allCompleted() {
+ // do nothing
+ }
+ }
+
+ private class MyMessageData extends MessageData {
+
+ public MyMessageData(String pdpName) {
+ super(new PdpStateChange(), 1, timers);
+
+ PdpStateChange msg = (PdpStateChange) getMessage();
+ msg.setName(pdpName);
+ msg.setState(PdpState.ACTIVE);
+ }
+
+ @Override
+ public String getType() {
+ return MY_MSG_TYPE;
+ }
+
+ @Override
+ public void mismatch(String reason) {
+ // do nothing
+ }
+
+ @Override
+ public void completed() {
+ // do nothing
+ }
+
+ @Override
+ public String checkResponse(PdpStatus response) {
+ // always valid - return null
+ return null;
+ }
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/Threaded.java b/main/src/test/java/org/onap/policy/pap/main/comm/Threaded.java
new file mode 100644
index 00000000..d6a0d1f1
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/Threaded.java
@@ -0,0 +1,112 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Super class for tests that run a background thread.
+ */
+public abstract class Threaded {
+
+ /**
+ * Max time to wait, in milliseconds, for a thread to terminate or for a message to be
+ * published.
+ */
+ public static final long MAX_WAIT_MS = 5000;
+
+ /**
+ * The current background thread.
+ */
+ private Thread thread;
+
+ /**
+ * Indicates that a test is about to begin.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Before
+ public void setUp() throws Exception {
+ thread = null;
+ }
+
+ /**
+ * Invokes the "stopper" function to tell the background thread to exit and then waits
+ * for it to terminate.
+ *
+ * @throws Exception if an error occurs
+ */
+ @After
+ public void tearDown() throws Exception {
+ stopThread();
+ waitStop();
+ }
+
+ /**
+ * Signals the background thread to stop.
+ *
+ * @throws Exception if an error occurs
+ */
+ protected abstract void stopThread() throws Exception;
+
+ /**
+ * Starts a background thread.
+ *
+ * @param runner what should be executed in the background thread
+ * @throws IllegalStateException if a background thread is already running
+ */
+ protected void startThread(Runnable runner) {
+ if (thread != null) {
+ throw new IllegalStateException("a background thread is already running");
+ }
+
+ thread = new Thread(runner);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Interrupts the background thread.
+ */
+ protected void interruptThread() {
+ thread.interrupt();
+ }
+
+ /**
+ * Waits for the background thread to stop.
+ *
+ * @return {@code true} if the thread has stopped, {@code false} otherwise
+ * @throws InterruptedException if this thread is interrupted while waiting
+ */
+ protected boolean waitStop() throws InterruptedException {
+ if (thread != null) {
+ Thread thread2 = thread;
+ thread = null;
+
+ thread2.join(MAX_WAIT_MS);
+
+ return !thread2.isAlive();
+ }
+
+ return true;
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/TimerManagerTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/TimerManagerTest.java
new file mode 100644
index 00000000..3d5da908
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/TimerManagerTest.java
@@ -0,0 +1,400 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.pap.main.comm.TimerManager.Timer;
+
+public class TimerManagerTest extends Threaded {
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final String MGR_NAME = "my-manager";
+ private static final String NAME1 = "timer-A";
+ private static final String NAME2 = "timer-B";
+ private static final String NAME3 = "timer-C";
+
+ private static final long MGR_TIMEOUT_MS = 10000;
+
+ private MyManager mgr;
+
+ /*
+ * This is a field to prevent checkstyle from complaining about the distance between
+ * its assignment and its use.
+ */
+ private long tcur;
+
+ /**
+ * Sets up.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ mgr = new MyManager(MGR_NAME, MGR_TIMEOUT_MS);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ @Override
+ protected void stopThread() throws Exception {
+ if (mgr != null) {
+ mgr.stop();
+ mgr.stopSleep();
+ }
+ }
+
+ @Test
+ public void testTimerManager_testStop() throws Exception {
+ startThread(mgr);
+
+ mgr.stop();
+ assertTrue(waitStop());
+
+ // ensure we can call "stop" a second time
+ mgr.stop();
+ }
+
+ @Test
+ public void testRegister() throws Exception {
+ mgr.register(NAME2, mgr::addToQueue);
+ mgr.register(NAME1, mgr::addToQueue);
+
+ // goes to the end of the queue
+ mgr.register(NAME2, mgr::addToQueue);
+
+ startThread(mgr);
+
+ mgr.allowSleep(2);
+
+ assertEquals(NAME1, mgr.awaitTimer());
+ assertEquals(NAME2, mgr.awaitTimer());
+ }
+
+ @Test
+ public void testRun_Ex() throws Exception {
+ startThread(mgr);
+ mgr.register(NAME1, mgr::addToQueue);
+
+ mgr.awaitSleep();
+
+ // background thread is "sleeping" - now we can interrupt it
+ interruptThread();
+
+ assertTrue(waitStop());
+ }
+
+ @Test
+ public void testProcessTimers() throws Exception {
+ startThread(mgr);
+ mgr.register(NAME1, mgr::addToQueue);
+ mgr.awaitSleep();
+ mgr.allowSleep(1);
+
+ mgr.register(NAME2, mgr::addToQueue);
+ mgr.awaitSleep();
+
+ // tell it to stop before returning from "sleep"
+ mgr.stop();
+ mgr.allowSleep(1);
+
+ assertTrue(waitStop());
+
+ assertEquals(NAME1, mgr.pollResult());
+ assertNull(mgr.pollResult());
+ }
+
+ @Test
+ public void testGetNextTimer() throws Exception {
+ startThread(mgr);
+ mgr.register(NAME1, mgr::addToQueue);
+ mgr.awaitSleep();
+ mgr.allowSleep(1);
+
+ mgr.register(NAME2, mgr::addToQueue);
+
+ mgr.awaitSleep();
+ }
+
+ @Test
+ public void testProcessTimer_StopWhileWaiting() throws Exception {
+ startThread(mgr);
+ mgr.register(NAME1, mgr::addToQueue);
+ mgr.awaitSleep();
+ mgr.allowSleep(1);
+
+ mgr.register(NAME2, mgr::addToQueue);
+ mgr.awaitSleep();
+
+ mgr.stop();
+ mgr.allowSleep(1);
+
+ assertTrue(waitStop());
+
+ // should have stopped after processing the first timer
+ assertEquals(NAME1, mgr.pollResult());
+ assertNull(mgr.pollResult());
+ }
+
+ @Test
+ public void testProcessTimer_CancelWhileWaiting() throws Exception {
+ startThread(mgr);
+ Timer timer = mgr.register(NAME1, mgr::addToQueue);
+ mgr.awaitSleep();
+
+ timer.cancel();
+ mgr.allowSleep(1);
+
+ mgr.register(NAME2, mgr::addToQueue);
+ mgr.awaitSleep();
+ mgr.allowSleep(1);
+
+ mgr.register(NAME1, mgr::addToQueue);
+ mgr.awaitSleep();
+
+ // should have fired timer 2, but not timer 1
+ assertEquals(NAME2, mgr.pollResult());
+ assertNull(mgr.pollResult());
+ }
+
+ @Test
+ public void testProcessTimer_TimerEx() throws Exception {
+ startThread(mgr);
+ mgr.register(NAME1, name -> {
+ throw new RuntimeException(EXPECTED_EXCEPTION);
+ });
+ mgr.register(NAME2, mgr::addToQueue);
+ mgr.awaitSleep();
+
+ mgr.allowSleep(2);
+
+ mgr.register(NAME3, mgr::addToQueue);
+ mgr.awaitSleep();
+
+ // timer 1 fired but threw an exception, so only timer 2 should be in the queue
+ assertEquals(NAME2, mgr.pollResult());
+ }
+
+ @Test
+ public void testTimerAwait() throws Exception {
+ startThread(mgr);
+
+ // same times - only need one sleep
+ mgr.register(NAME1, mgr::addToQueue);
+ mgr.register(NAME2, mgr::addToQueue);
+ mgr.awaitSleep();
+
+ tcur = mgr.currentTimeMillis();
+
+ mgr.allowSleep(1);
+
+ // next one will have a new timeout, so expect to sleep
+ mgr.register(NAME3, mgr::addToQueue);
+ mgr.awaitSleep();
+
+ long tcur2 = mgr.currentTimeMillis();
+ assertTrue(tcur2 >= tcur + MGR_TIMEOUT_MS);
+
+ assertEquals(NAME1, mgr.pollResult());
+ assertEquals(NAME2, mgr.pollResult());
+ assertNull(mgr.pollResult());
+ }
+
+ @Test
+ public void testTimerCancel_WhileWaiting() throws Exception {
+ startThread(mgr);
+
+ Timer timer = mgr.register(NAME1, mgr::addToQueue);
+ mgr.awaitSleep();
+
+ // cancel while sleeping
+ timer.cancel();
+
+ mgr.register(NAME2, mgr::addToQueue);
+
+ // allow it to sleep through both timers
+ mgr.allowSleep(2);
+
+ // only timer 2 should have fired
+ assertEquals(NAME2, mgr.timedPollResult());
+ }
+
+ @Test
+ public void testTimerCancel_ViaReplace() throws Exception {
+ startThread(mgr);
+
+ mgr.register(NAME1, name -> mgr.addToQueue("hello"));
+ mgr.awaitSleep();
+
+ // replace the timer while the background thread is sleeping
+ mgr.register(NAME1, name -> mgr.addToQueue("world"));
+
+ // allow it to sleep through both timers
+ mgr.allowSleep(2);
+
+ // only timer 2 should have fired
+ assertEquals("world", mgr.timedPollResult());
+ }
+
+ @Test
+ public void testTimerToString() {
+ Timer timer = mgr.register(NAME1, mgr::addToQueue);
+ assertNotNull(timer.toString());
+ }
+
+ @Test
+ public void testCurrentTimeMillis() {
+ long tbeg = System.currentTimeMillis();
+ long tcur = new TimerManager(MGR_NAME, MGR_TIMEOUT_MS).currentTimeMillis();
+ long tend = System.currentTimeMillis();
+
+ assertTrue(tcur >= tbeg);
+ assertTrue(tend >= tcur);
+ }
+
+ @Test
+ public void testSleep() throws Exception {
+ long tbeg = System.currentTimeMillis();
+ new TimerManager(MGR_NAME, MGR_TIMEOUT_MS).sleep(10);
+ long tend = System.currentTimeMillis();
+
+ assertTrue(tend >= tbeg + 10);
+ }
+
+ private static class MyManager extends TimerManager {
+ private AtomicLong curTime = new AtomicLong(1000);
+ private LinkedBlockingQueue<Boolean> sleepEntered = new LinkedBlockingQueue<>();
+ private LinkedBlockingQueue<Boolean> shouldStop = new LinkedBlockingQueue<>();
+ private LinkedBlockingQueue<String> results = new LinkedBlockingQueue<>();
+
+ public MyManager(String name, long waitTimeMs) {
+ super(name, waitTimeMs);
+ }
+
+ /**
+ * Registers a timer. Also increments {@link #curTime} so that every time has a
+ * different expiration time, which prevents some issue with the junit tests.
+ */
+ @Override
+ public Timer register(String timerName, Consumer<String> action) {
+ curTime.addAndGet(1);
+ return super.register(timerName, action);
+ }
+
+ /**
+ * Stops the "sleep".
+ */
+ public void stopSleep() {
+ shouldStop.add(true);
+ }
+
+ /**
+ * Allows the manager to "sleep" several times.
+ *
+ * @param ntimes the number of times the manager should sleep
+ */
+ public void allowSleep(int ntimes) {
+ for (int x = 0; x < ntimes; ++x) {
+ shouldStop.add(false);
+ }
+ }
+
+ /**
+ * Waits for the manager to "sleep".
+ *
+ * @throws InterruptedException if the thread is interrupted while waiting for the
+ * background thread to sleep
+ */
+ public void awaitSleep() throws InterruptedException {
+ if (sleepEntered.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS) == null) {
+ fail("background thread failed to sleep");
+ }
+ }
+
+ @Override
+ protected long currentTimeMillis() {
+ return curTime.get();
+ }
+
+ @Override
+ protected void sleep(long timeMs) throws InterruptedException {
+ sleepEntered.offer(true);
+
+ if (!shouldStop.take()) {
+ // test thread did not request that we stop
+ curTime.addAndGet(timeMs);
+ }
+ }
+
+ /**
+ * Waits for a timer to fire.
+ *
+ * @return the message the timer added to {@link #results}
+ * @throws InterruptedException if this thread is interrupted while waiting
+ */
+ private String awaitTimer() throws InterruptedException {
+ return results.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Adds a name to the queue.
+ *
+ * @param name the name to add
+ */
+ private void addToQueue(String name) {
+ results.add(name);
+ }
+
+ /**
+ * Polls for a result.
+ *
+ * @return the next result, or {@code null}
+ */
+ private String pollResult() {
+ return results.poll();
+ }
+
+ /**
+ * Polls for a result, waiting a limited amount of time.
+ *
+ * @return the next result, or {@code null}
+ * @throws InterruptedException if the thread is interrupted while waiting
+ */
+ private String timedPollResult() throws InterruptedException {
+ return results.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/MessageDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/MessageDataTest.java
new file mode 100644
index 00000000..68b02635
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/MessageDataTest.java
@@ -0,0 +1,92 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.TimerManager;
+
+public class MessageDataTest {
+ private static final int RETRIES = 1;
+
+ private MyData data;
+ private TimerManager timers;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ public void setUp() {
+ timers = mock(TimerManager.class);
+
+ data = new MyData();
+ }
+
+ @Test
+ public void testGetMessage() {
+ assertNotNull(data.getMessage());
+ }
+
+ @Test
+ public void testGetType() {
+ assertEquals(PdpStateChange.class.getSimpleName(), data.getType());
+ }
+
+ @Test
+ public void testGetMaxRetryCount() {
+ assertEquals(RETRIES, data.getMaxRetryCount());
+ }
+
+ @Test
+ public void testGetTimers() {
+ assertSame(timers, data.getTimers());
+ }
+
+ private class MyData extends MessageData {
+
+ public MyData() {
+ super(new PdpStateChange(), RETRIES, timers);
+ }
+
+ @Override
+ public void mismatch(String reason) {
+ // do nothing
+ }
+
+ @Override
+ public void completed() {
+ // do nothing
+ }
+
+ @Override
+ public String checkResponse(PdpStatus response) {
+ // always succeed
+ return null;
+ }
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/StateChangeDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/StateChangeDataTest.java
new file mode 100644
index 00000000..029775fa
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/StateChangeDataTest.java
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.enums.PdpState;
+import org.onap.policy.pap.main.comm.TimerManager;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.PdpParameters;
+import org.onap.policy.pap.main.parameters.PdpStateChangeParameters;
+
+public class StateChangeDataTest {
+ private static final String MY_NAME = "my-name";
+ private static final String DIFFERENT = "different";
+ private static final PdpState MY_STATE = PdpState.SAFE;
+ private static final PdpState DIFF_STATE = PdpState.TERMINATED;
+ private static final int RETRIES = 1;
+
+ private MyData data;
+ private PdpModifyRequestMapParams params;
+ private TimerManager timers;
+ private PdpStatus response;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ public void setUp() {
+ timers = mock(TimerManager.class);
+ response = new PdpStatus();
+ PdpParameters pdpParams = mock(PdpParameters.class);
+ PdpStateChangeParameters stateParams = mock(PdpStateChangeParameters.class);
+
+ when(stateParams.getMaxRetryCount()).thenReturn(RETRIES);
+ when(pdpParams.getStateChangeParameters()).thenReturn(stateParams);
+
+ params = new PdpModifyRequestMapParams().setParams(pdpParams).setStateChangeTimers(timers);
+
+ response.setName(MY_NAME);
+ response.setState(MY_STATE);
+
+ data = new MyData();
+ }
+
+ @Test
+ public void testGetMaxRetryCount() {
+ assertEquals(RETRIES, data.getMaxRetryCount());
+ }
+
+ @Test
+ public void testGetTimers() {
+ assertSame(timers, data.getTimers());
+ }
+
+ @Test
+ public void testStateChangeCheckResponse() {
+ assertNull(data.checkResponse(response));
+ }
+
+ @Test
+ public void testStateChangeCheckResponse_MismatchedName() {
+ response.setName(DIFFERENT);
+
+ assertEquals("name does not match", data.checkResponse(response));
+ }
+
+ @Test
+ public void testStateChangeCheckResponse_MismatchedState() {
+ response.setState(DIFF_STATE);
+
+ assertEquals("state is TERMINATED, but expected SAFE", data.checkResponse(response));
+ }
+
+ private class MyData extends StateChangeData {
+
+ public MyData() {
+ super(new PdpStateChange(), params);
+
+ PdpStateChange msg = (PdpStateChange) getMessage();
+
+ msg.setName(MY_NAME);
+ msg.setState(MY_STATE);
+ }
+
+ @Override
+ public void mismatch(String reason) {
+ // do nothing
+ }
+
+ @Override
+ public void completed() {
+ // do nothing
+ }
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/UpdateDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/UpdateDataTest.java
new file mode 100644
index 00000000..8676c95e
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/UpdateDataTest.java
@@ -0,0 +1,169 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm.msgdata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.models.base.PfConceptKey;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy;
+import org.onap.policy.pap.main.comm.TimerManager;
+import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.PdpParameters;
+import org.onap.policy.pap.main.parameters.PdpUpdateParameters;
+
+public class UpdateDataTest {
+ private static final String MY_NAME = "my-name";
+ private static final String DIFFERENT = "different";
+ private static final int RETRIES = 1;
+
+ private MyData data;
+ private PdpModifyRequestMapParams params;
+ private TimerManager timers;
+ private PdpUpdate update;
+ private PdpStatus response;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ public void setUp() {
+ timers = mock(TimerManager.class);
+ response = new PdpStatus();
+ PdpParameters pdpParams = mock(PdpParameters.class);
+ PdpUpdateParameters stateParams = mock(PdpUpdateParameters.class);
+
+ when(stateParams.getMaxRetryCount()).thenReturn(RETRIES);
+ when(pdpParams.getUpdateParameters()).thenReturn(stateParams);
+
+ params = new PdpModifyRequestMapParams().setParams(pdpParams).setUpdateTimers(timers);
+
+ update = makeUpdate();
+
+ response.setName(MY_NAME);
+ response.setPdpGroup(update.getPdpGroup());
+ response.setPdpSubgroup(update.getPdpSubgroup());
+ response.setPolicies(update.getPolicies());
+
+ data = new MyData(update);
+ }
+
+ @Test
+ public void testGetMaxRetryCount() {
+ assertEquals(RETRIES, data.getMaxRetryCount());
+ }
+
+ @Test
+ public void testGetTimers() {
+ assertSame(timers, data.getTimers());
+ }
+
+ @Test
+ public void testUpdateCheckResponse() {
+ assertNull(data.checkResponse(response));
+ }
+
+ @Test
+ public void testUpdateDataCheckResponse_MismatchedName() {
+ response.setName(DIFFERENT);
+
+ assertEquals("name does not match", data.checkResponse(response));
+ }
+
+ @Test
+ public void testUpdateDataCheckResponse_MismatchedGroup() {
+ response.setPdpGroup(DIFFERENT);
+
+ assertEquals("group does not match", data.checkResponse(response));
+ }
+
+ @Test
+ public void testUpdateDataCheckResponse_MismatchedSubGroup() {
+ response.setPdpSubgroup(DIFFERENT);
+
+ assertEquals("subgroup does not match", data.checkResponse(response));
+ }
+
+ @Test
+ public void testUpdateDataCheckResponse_MismatchedPoliciesLength() {
+ response.setPolicies(Arrays.asList(update.getPolicies().get(0)));
+
+ assertEquals("policies do not match", data.checkResponse(response));
+ }
+
+ @Test
+ public void testUpdateDataCheckResponse_MismatchedPolicies() {
+ ArrayList<ToscaPolicy> policies = new ArrayList<>(update.getPolicies());
+ policies.set(0, new ToscaPolicy(new PfConceptKey(DIFFERENT, "10.0.0")));
+
+ response.setPolicies(policies);
+
+ assertEquals("policies do not match", data.checkResponse(response));
+ }
+
+ /**
+ * Makes an update message.
+ *
+ * @return a new update message
+ */
+ private PdpUpdate makeUpdate() {
+ PdpUpdate upd = new PdpUpdate();
+
+ upd.setDescription("update-description");
+ upd.setName(MY_NAME);
+ upd.setPdpGroup("group1-a");
+ upd.setPdpSubgroup("sub1-a");
+ upd.setPdpType("drools");
+
+ ToscaPolicy policy1 = new ToscaPolicy(new PfConceptKey("policy-1-a", "1.0.0"));
+ ToscaPolicy policy2 = new ToscaPolicy(new PfConceptKey("policy-2-a", "1.1.0"));
+
+ upd.setPolicies(Arrays.asList(policy1, policy2));
+
+ return upd;
+ }
+
+ private class MyData extends UpdateData {
+
+ public MyData(PdpUpdate message) {
+ super(message, params);
+ }
+
+ @Override
+ public void mismatch(String reason) {
+ // do nothing
+ }
+
+ @Override
+ public void completed() {
+ // do nothing
+ }
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/parameters/TestPdpModifyRequestMapParams.java b/main/src/test/java/org/onap/policy/pap/main/parameters/TestPdpModifyRequestMapParams.java
new file mode 100644
index 00000000..3e691899
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/parameters/TestPdpModifyRequestMapParams.java
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.parameters;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.Publisher;
+import org.onap.policy.pap.main.comm.TimerManager;
+
+public class TestPdpModifyRequestMapParams {
+ private PdpModifyRequestMapParams params;
+ private Publisher pub;
+ private RequestIdDispatcher<PdpStatus> disp;
+ private Object lock;
+ private PdpParameters pdpParams;
+ private TimerManager updTimers;
+ private TimerManager stateTimers;
+
+ /**
+ * Sets up the objects and creates an empty {@link #params}.
+ */
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() {
+ pub = mock(Publisher.class);
+ disp = mock(RequestIdDispatcher.class);
+ lock = new Object();
+ pdpParams = mock(PdpParameters.class);
+ updTimers = mock(TimerManager.class);
+ stateTimers = mock(TimerManager.class);
+
+ params = new PdpModifyRequestMapParams().setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp);
+ }
+
+ @Test
+ public void testGettersSetters() {
+ assertSame(params, params.setParams(pdpParams).setStateChangeTimers(stateTimers).setUpdateTimers(updTimers));
+
+ assertSame(pdpParams, params.getParams());
+ assertSame(updTimers, params.getUpdateTimers());
+ assertSame(stateTimers, params.getStateChangeTimers());
+
+ // super class data should also be available
+ assertSame(pub, params.getPublisher());
+ assertSame(disp, params.getResponseDispatcher());
+ assertSame(lock, params.getModifyLock());
+ }
+
+ @Test
+ public void testValidate() {
+ // no exception
+ params.setParams(pdpParams).setStateChangeTimers(stateTimers).setUpdateTimers(updTimers).validate();
+ }
+
+ @Test
+ public void testValidate_MissingPdpParams() {
+ assertThatIllegalArgumentException().isThrownBy(
+ () -> params.setStateChangeTimers(stateTimers).setUpdateTimers(updTimers).validate())
+ .withMessageContaining("PDP param");
+ }
+
+ @Test
+ public void testValidate_MissingStateChangeTimers() {
+ assertThatIllegalArgumentException().isThrownBy(
+ () -> params.setParams(pdpParams).setUpdateTimers(updTimers).validate())
+ .withMessageContaining("state");
+ }
+
+ @Test
+ public void testValidate_MissingUpdateTimers() {
+ assertThatIllegalArgumentException().isThrownBy(
+ () -> params.setParams(pdpParams).setStateChangeTimers(stateTimers).validate())
+ .withMessageContaining("update");
+ }
+
+ @Test
+ public void testValidate_MissingSuperclassData() {
+ // leave out one of the superclass fields
+ assertThatIllegalArgumentException().isThrownBy(
+ () -> new PdpModifyRequestMapParams()
+ .setPublisher(pub)
+ .setResponseDispatcher(disp).setParams(pdpParams).setStateChangeTimers(stateTimers)
+ .setUpdateTimers(updTimers).validate()).withMessageContaining("Lock");
+
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/parameters/TestRequestDataParams.java b/main/src/test/java/org/onap/policy/pap/main/parameters/TestRequestDataParams.java
new file mode 100644
index 00000000..16d247f2
--- /dev/null
+++ b/main/src/test/java/org/onap/policy/pap/main/parameters/TestRequestDataParams.java
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.parameters;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.pap.main.comm.Publisher;
+
+public class TestRequestDataParams {
+ private RequestDataParams params;
+ private Publisher pub;
+ private RequestIdDispatcher<PdpStatus> disp;
+ private Object lock;
+
+ /**
+ * Sets up the objects and creates an empty {@link #params}.
+ */
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() {
+ pub = mock(Publisher.class);
+ disp = mock(RequestIdDispatcher.class);
+ lock = new Object();
+
+ params = new RequestDataParams();
+ }
+
+ @Test
+ public void testGettersSetters() {
+ assertSame(params, params.setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp));
+
+ assertSame(pub, params.getPublisher());
+ assertSame(disp, params.getResponseDispatcher());
+ assertSame(lock, params.getModifyLock());
+ }
+
+ @Test
+ public void testValidate() {
+ // no exception
+ params.setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp).validate();
+ }
+
+ @Test
+ public void testValidate_MissingLock() {
+ assertThatIllegalArgumentException().isThrownBy(
+ () -> params.setPublisher(pub).setResponseDispatcher(disp).validate())
+ .withMessageContaining("Lock");
+ }
+
+ @Test
+ public void testValidate_MissingDispatcher() {
+ assertThatIllegalArgumentException().isThrownBy(
+ () -> params.setModifyLock(lock).setPublisher(pub).validate())
+ .withMessageContaining("Dispatcher");
+ }
+
+ @Test
+ public void testValidate_MissingPublisher() {
+ assertThatIllegalArgumentException().isThrownBy(
+ () -> params.setModifyLock(lock).setResponseDispatcher(disp).validate())
+ .withMessageContaining("publisher");
+ }
+}
diff --git a/main/src/test/java/org/onap/policy/pap/main/startstop/TestPapActivator.java b/main/src/test/java/org/onap/policy/pap/main/startstop/TestPapActivator.java
index cfa2ae92..6c9e092e 100644
--- a/main/src/test/java/org/onap/policy/pap/main/startstop/TestPapActivator.java
+++ b/main/src/test/java/org/onap/policy/pap/main/startstop/TestPapActivator.java
@@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.FileInputStream;
@@ -35,6 +36,7 @@ import org.junit.Test;
import org.onap.policy.common.utils.services.Registry;
import org.onap.policy.pap.main.PapConstants;
import org.onap.policy.pap.main.PolicyPapException;
+import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
import org.onap.policy.pap.main.parameters.CommonTestData;
import org.onap.policy.pap.main.parameters.PapParameterGroup;
import org.onap.policy.pap.main.parameters.PapParameterHandler;
@@ -75,6 +77,7 @@ public class TestPapActivator {
/**
* Method for cleanup after each test.
+ *
* @throws Exception if an error occurs
*/
@After
@@ -95,6 +98,7 @@ public class TestPapActivator {
// ensure items were added to the registry
assertNotNull(Registry.get(PapConstants.REG_PDP_MODIFY_LOCK, Object.class));
assertNotNull(Registry.get(PapConstants.REG_STATISTICS_MANAGER, PapStatisticsManager.class));
+ assertNotNull(Registry.get(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class));
// repeat - should throw an exception
assertThatIllegalStateException().isThrownBy(() -> activator.start());
@@ -108,6 +112,11 @@ public class TestPapActivator {
activator.stop();
assertFalse(activator.isAlive());
+ // ensure items have been removed from the registry
+ assertNull(Registry.getOrDefault(PapConstants.REG_PDP_MODIFY_LOCK, Object.class, null));
+ assertNull(Registry.getOrDefault(PapConstants.REG_STATISTICS_MANAGER, PapStatisticsManager.class, null));
+ assertNull(Registry.getOrDefault(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class, null));
+
// repeat - should throw an exception
assertThatIllegalStateException().isThrownBy(() -> activator.stop());
assertFalse(activator.isAlive());