diff options
Diffstat (limited to 'main')
23 files changed, 4167 insertions, 12 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java new file mode 100644 index 00000000..f2ebca5c --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpModifyRequestMap.java @@ -0,0 +1,377 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import org.onap.policy.models.pdp.concepts.PdpStateChange; +import org.onap.policy.models.pdp.concepts.PdpUpdate; +import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy; +import org.onap.policy.pap.main.comm.msgdata.StateChangeData; +import org.onap.policy.pap.main.comm.msgdata.UpdateData; +import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; + +/** + * Maps a PDP name to requests that modify PDPs. + */ +public class PdpModifyRequestMap { + + /** + * Maps a PDP name to its request data. An entry is removed once all of the requests + * within the data have been completed. + */ + private final Map<String, ModifyReqData> name2data = new HashMap<>(); + + /** + * PDP modification lock. + */ + private final Object modifyLock; + + /** + * The configuration parameters. + */ + private final PdpModifyRequestMapParams params; + + /** + * Constructs the data. + * + * @param params configuration parameters + * + * @throws IllegalArgumentException if a required parameter is not set + */ + public PdpModifyRequestMap(PdpModifyRequestMapParams params) { + params.validate(); + + this.params = params; + this.modifyLock = params.getModifyLock(); + } + + /** + * Adds an UPDATE request to the map. + * + * @param update the UPDATE request or {@code null} + */ + public void addRequest(PdpUpdate update) { + addRequest(update, null); + } + + /** + * Adds STATE-CHANGE request to the map. + * + * @param stateChange the STATE-CHANGE request or {@code null} + */ + public void addRequest(PdpStateChange stateChange) { + addRequest(null, stateChange); + } + + /** + * Adds a pair of requests to the map. + * + * @param update the UPDATE request or {@code null} + * @param stateChange the STATE-CHANGE request or {@code null} + */ + public void addRequest(PdpUpdate update, PdpStateChange stateChange) { + if (update == null && stateChange == null) { + return; + } + + synchronized (modifyLock) { + String pdpName = getPdpName(update, stateChange); + + ModifyReqData data = name2data.get(pdpName); + if (data != null) { + // update the existing request + data.add(update); + data.add(stateChange); + + } else { + data = makeRequestData(update, stateChange); + name2data.put(pdpName, data); + data.startPublishing(); + } + } + } + + /** + * Gets the PDP name from two requests. + * + * @param update the update request, or {@code null} + * @param stateChange the state-change request, or {@code null} + * @return the PDP name, or {@code null} if both requests are {@code null} + */ + private static String getPdpName(PdpUpdate update, PdpStateChange stateChange) { + String pdpName; + + if (update != null) { + if ((pdpName = update.getName()) == null) { + throw new IllegalArgumentException("missing name in " + update); + } + + if (stateChange != null && !pdpName.equals(stateChange.getName())) { + throw new IllegalArgumentException( + "name " + stateChange.getName() + " does not match " + pdpName + " " + stateChange); + } + + } else { + if ((pdpName = stateChange.getName()) == null) { + throw new IllegalArgumentException("missing name in " + stateChange); + } + } + + return pdpName; + } + + /** + * Determines if two requests are the "same", which is does not necessarily mean + * "equals". + * + * @param first first request to check + * @param second second request to check + * @return {@code true} if the requests are the "same", {@code false} otherwise + */ + protected static boolean isSame(PdpUpdate first, PdpUpdate second) { + if (first.getPolicies().size() != second.getPolicies().size()) { + return false; + } + + if (!first.getPdpGroup().equals(second.getPdpGroup())) { + return false; + } + + if (!first.getPdpSubgroup().equals(second.getPdpSubgroup())) { + return false; + } + + // see if the other has any policies that this does not have + ArrayList<ToscaPolicy> lst = new ArrayList<>(second.getPolicies()); + lst.removeAll(first.getPolicies()); + + return lst.isEmpty(); + } + + /** + * Determines if two requests are the "same", which is does not necessarily mean + * "equals". + * + * @param first first request to check + * @param second second request to check + * @return {@code true} if this update subsumes the other, {@code false} otherwise + */ + protected static boolean isSame(PdpStateChange first, PdpStateChange second) { + return (first.getState() == second.getState()); + } + + /** + * Request data, which contains an UPDATE or a STATE-CHANGE request, or both. The + * UPDATE is always published before the STATE-CHANGE. In addition, both requests may + * be changed at any point, possibly triggering a restart of the publishing. + */ + public class ModifyReqData extends RequestData { + + /** + * The UPDATE message to be published, or {@code null}. + */ + private PdpUpdate update; + + /** + * The STATE-CHANGE message to be published, or {@code null}. + */ + private PdpStateChange stateChange; + + + /** + * Constructs the object. + * + * @param newUpdate the UPDATE message to be sent, or {@code null} + * @param newState the STATE-CHANGE message to be sent, or {@code null} + */ + public ModifyReqData(PdpUpdate newUpdate, PdpStateChange newState) { + super(params); + + if (newUpdate != null) { + this.stateChange = newState; + setName(newUpdate.getName()); + update = newUpdate; + configure(new ModUpdateData(newUpdate)); + + } else { + this.update = null; + setName(newState.getName()); + stateChange = newState; + configure(new ModStateChangeData(newState)); + } + } + + /** + * Determines if this request is still in the map. + */ + @Override + protected boolean isActive() { + return (name2data.get(getName()) == this); + } + + /** + * Removes this request from the map. + */ + @Override + protected void allCompleted() { + name2data.remove(getName(), this); + } + + /** + * Adds an UPDATE to the request data, replacing any existing UPDATE, if + * appropriate. If the UPDATE is replaced, then publishing is restarted. + * + * @param newRequest the new UPDATE request + */ + private void add(PdpUpdate newRequest) { + if (newRequest == null) { + return; + } + + synchronized (modifyLock) { + if (update != null && isSame(update, newRequest)) { + // already have this update - discard it + return; + } + + // must restart from scratch + stopPublishing(); + + update = newRequest; + configure(new ModUpdateData(newRequest)); + + startPublishing(); + } + } + + /** + * Adds a STATE-CHANGE to the request data, replacing any existing UPDATE, if + * appropriate. If the STATE-CHANGE is replaced, and we're currently publishing + * the STATE-CHANGE, then publishing is restarted. + * + * @param newRequest the new STATE-CHANGE request + */ + private void add(PdpStateChange newRequest) { + if (newRequest == null) { + return; + } + + synchronized (modifyLock) { + if (stateChange != null && isSame(stateChange, newRequest)) { + // already have this update - discard it + return; + } + + if (getWrapper() instanceof StateChangeData) { + // we were publishing STATE-CHANGE, thus must restart it + stopPublishing(); + + stateChange = newRequest; + configure(new ModStateChangeData(newRequest)); + + startPublishing(); + + } else { + // haven't started publishing STATE-CHANGE yet, just replace it + stateChange = newRequest; + } + } + } + + /** + * Indicates that the retry count was exhausted. + */ + protected void retryCountExhausted() { + // remove this request data from the PDP request map + allCompleted(); + + // TODO what to do? + } + + /** + * Indicates that a response did not match the data. + * + * @param reason the reason for the mismatch + */ + protected void mismatch(String reason) { + // remove this request data from the PDP request map + allCompleted(); + + // TODO what to do? + } + + /** + * Wraps an UPDATE. + */ + private class ModUpdateData extends UpdateData { + + public ModUpdateData(PdpUpdate message) { + super(message, params); + } + + @Override + public void mismatch(String reason) { + ModifyReqData.this.mismatch(reason); + } + + @Override + public void completed() { + if (stateChange == null) { + // no STATE-CHANGE request - we're done + allCompleted(); + + } else { + // now process the STATE-CHANGE request + configure(new ModStateChangeData(stateChange)); + startPublishing(); + } + } + } + + /** + * Wraps a STATE-CHANGE. + */ + private class ModStateChangeData extends StateChangeData { + + public ModStateChangeData(PdpStateChange message) { + super(message, params); + } + + @Override + public void mismatch(String reason) { + ModifyReqData.this.mismatch(reason); + } + + @Override + public void completed() { + allCompleted(); + } + } + } + + // these may be overridden by junit tests + + protected ModifyReqData makeRequestData(PdpUpdate update, PdpStateChange stateChange) { + return new ModifyReqData(update, stateChange); + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java b/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java new file mode 100644 index 00000000..6032d17e --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/Publisher.java @@ -0,0 +1,136 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.models.pdp.concepts.PdpMessage; +import org.onap.policy.pap.main.PolicyPapException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Publishes messages to a topic. Maintains a queue of references to data that is to be + * published. Once the publisher removes a reference from a queue, it sets it to + * {@link null} to indicate that it is being processed. Until it has been set to + * {@link null}, clients are free to atomically update the reference to new values, thus + * maintaining their place in the queue. + * + * <p>This class has not been tested for multiple threads invoking {@link #run()} + * simultaneously. + */ +public class Publisher implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(Publisher.class); + + /** + * Used to send to the topic. + */ + private final TopicSinkClient client; + + /** + * Request queue. The references may contain {@code null}. + */ + private final BlockingQueue<QueueToken<PdpMessage>> queue = new LinkedBlockingQueue<>(); + + /** + * Set to {@code true} to cause the publisher to stop running. + */ + private volatile boolean stopNow = false; + + /** + * Constructs the object. + * + * @param topic name of the topic to which to publish + * @throws PolicyPapException if the topic sink does not exist + */ + public Publisher(String topic) throws PolicyPapException { + try { + this.client = new TopicSinkClient(topic); + } catch (TopicSinkClientException e) { + throw new PolicyPapException(e); + } + } + + /** + * Stops the publisher, if it's running. + */ + public void stop() { + stopNow = true; + + // add an empty reference so the thread doesn't block on the queue + queue.add(new QueueToken<>(null)); + } + + /** + * Adds an item to the queue. The referenced objects are assumed to be POJOs and will + * be converted to JSON via the {@link StandardCoder} prior to publishing. + * + * @param ref reference to the message to be published + */ + public void enqueue(QueueToken<PdpMessage> ref) { + queue.add(ref); + } + + /** + * Continuously publishes items in the queue until {@link #stop()} is invoked. + */ + @Override + public void run() { + for (;;) { + QueueToken<PdpMessage> token = getNext(); + + if (stopNow) { + // unblock any other publisher threads + queue.offer(new QueueToken<>(null)); + break; + } + + PdpMessage data = token.replaceItem(null); + if (data == null) { + continue; + } + + client.send(data); + } + } + + /** + * Gets the next item from the queue. If the thread is interrupted, then it sets + * {@link #stopNow}. + * + * @return the next item, or a reference containing {@code null} if this is + * interrupted + */ + private QueueToken<PdpMessage> getNext() { + try { + return queue.take(); + + } catch (InterruptedException e) { + logger.warn("Publisher stopping due to interrupt"); + stopNow = true; + Thread.currentThread().interrupt(); + return new QueueToken<>(null); + } + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/QueueToken.java b/main/src/main/java/org/onap/policy/pap/main/comm/QueueToken.java new file mode 100644 index 00000000..a68b7c05 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/QueueToken.java @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Token that can be placed within a publisher's queue. The item that a token references + * may be replaced any time up until it is set to {@code null}. Once it has been set to + * {@code null}, it cannot be replaced. + * + * @param <T> type of object referenced by the token + */ +public class QueueToken<T> { + + /** + * Wraps the item. + */ + private final AtomicReference<T> ref; + + /** + * Constructs the object. + * + * @param item initial token item + */ + public QueueToken(T item) { + ref = new AtomicReference<>(item); + } + + /** + * Gets the item referenced by this token. + * + * @return the item referenced by this token + */ + public final T get() { + return ref.get(); + } + + /** + * Replaces the token's item. If the current item is {@code null}, then it is left + * unchanged. + * + * @param newItem the new item + * @return the original item + */ + public T replaceItem(T newItem) { + T oldItem; + while ((oldItem = ref.get()) != null) { + if (ref.compareAndSet(oldItem, newItem)) { + break; + } + } + + // it was already null, or we successfully replaced the item + return oldItem; + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java b/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java new file mode 100644 index 00000000..29ad85bc --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/RequestData.java @@ -0,0 +1,296 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.utils.services.ServiceManager; +import org.onap.policy.models.pdp.concepts.PdpMessage; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.pap.main.comm.msgdata.MessageData; +import org.onap.policy.pap.main.parameters.RequestDataParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Request data, the content of which may be changed at any point, possibly triggering a + * restart of the publishing. + */ +public abstract class RequestData { + private static final Logger logger = LoggerFactory.getLogger(RequestData.class); + + /** + * Name with which this data is associated, used for logging purposes. + */ + @Getter + @Setter(AccessLevel.PROTECTED) + private String name; + + /** + * The configuration parameters. + */ + private final RequestDataParams params; + + /** + * Current retry count. + */ + private int retryCount = 0; + + /** + * Used to register/unregister the listener and the timer. + */ + private ServiceManager svcmgr; + + /** + * Wrapper for the message that is currently being published (i.e., {@link #update} or + * {@link #stateChange}. + */ + @Getter(AccessLevel.PROTECTED) + private MessageData wrapper; + + /** + * Used to cancel a timer. + */ + private TimerManager.Timer timer; + + /** + * Token that is placed on the queue. + */ + private QueueToken<PdpMessage> token = null; + + + /** + * Constructs the object, and validates the parameters. + * + * @param params configuration parameters + * + * @throws IllegalArgumentException if a required parameter is not set + */ + public RequestData(@NonNull RequestDataParams params) { + params.validate(); + + this.params = params; + } + + /** + * Starts the publishing process, registering any listeners or timeout handlers, and + * adding the request to the publisher queue. This should not be invoked until after + * {@link #configure(MessageData)} is invoked. + */ + public void startPublishing() { + + synchronized (params.getModifyLock()) { + if (!svcmgr.isAlive()) { + svcmgr.start(); + } + } + } + + /** + * Unregisters the listener and cancels the timer. + */ + protected void stopPublishing() { + if (svcmgr.isAlive()) { + svcmgr.stop(); + } + } + + /** + * Configures the fields based on the {@link #message} type. + * + * @param newWrapper the new message wrapper + */ + protected void configure(MessageData newWrapper) { + + wrapper = newWrapper; + + resetRetryCount(); + + TimerManager timerManager = wrapper.getTimers(); + String msgType = wrapper.getType(); + String reqid = wrapper.getMessage().getRequestId(); + + /* + * We have to configure the service manager HERE, because it's name changes if the + * message class changes. + */ + + // @formatter:off + this.svcmgr = new ServiceManager(name + " " + msgType) + .addAction("listener", + () -> params.getResponseDispatcher().register(reqid, this::processResponse), + () -> params.getResponseDispatcher().unregister(reqid)) + .addAction("timer", + () -> timer = timerManager.register(name, this::handleTimeout), + () -> timer.cancel()) + .addAction("enqueue", + () -> enqueue(), + () -> { + // nothing to "stop" + }); + // @formatter:on + } + + /** + * Enqueues the current message with the publisher, putting it into the queue token, + * if possible. Otherwise, it adds a new token to the queue. + */ + private void enqueue() { + PdpMessage message = wrapper.getMessage(); + if (token != null && token.replaceItem(message) != null) { + // took the other's place in the queue - continue using the token + return; + } + + // couldn't take the other's place - add our own token to the queue + token = new QueueToken<>(message); + params.getPublisher().enqueue(token); + } + + /** + * Resets the retry count. + */ + protected void resetRetryCount() { + retryCount = 0; + } + + /** + * Bumps the retry count. + * + * @return {@code true} if successful, {@code false} if the limit has been reached + */ + protected boolean bumpRetryCount() { + if (retryCount >= wrapper.getMaxRetryCount()) { + return false; + } + + retryCount++; + return true; + } + + /** + * Indicates that the retry count was exhausted. The default method simply invokes + * {@link #allCompleted()}. + */ + protected void retryCountExhausted() { + // remove this request data from the PDP request map + allCompleted(); + } + + /** + * Processes a response received from the PDP. + * + * @param infra infrastructure on which the response was received + * @param topic topic on which the response was received + * @param response the response + */ + private void processResponse(CommInfrastructure infra, String topic, PdpStatus response) { + + synchronized (params.getModifyLock()) { + if (!svcmgr.isAlive()) { + // this particular request must have been discarded + return; + } + + stopPublishing(); + + if (!isActive()) { + return; + } + + String reason = wrapper.checkResponse(response); + if (reason != null) { + logger.info("{} PDP data mismatch: {}", getName(), reason); + wrapper.mismatch(reason); + + } else { + logger.info("{} {} successful", getName(), wrapper.getType()); + wrapper.completed(); + } + } + } + + /** + * Handles a timeout. + * + * @param timerName the timer timer + */ + private void handleTimeout(String timerName) { + + synchronized (params.getModifyLock()) { + if (!svcmgr.isAlive()) { + // this particular request must have been discarded + return; + } + + stopPublishing(); + + if (!isActive()) { + return; + } + + if (isInQueue()) { + // haven't published yet - just leave it in the queue and reset counts + logger.info("{} timeout - request still in the queue", getName()); + resetRetryCount(); + startPublishing(); + return; + } + + if (!bumpRetryCount()) { + logger.info("{} timeout - retry count exhausted", getName()); + retryCountExhausted(); + return; + } + + // re-publish + logger.info("{} timeout - re-publish", getName()); + startPublishing(); + } + } + + /** + * Determines if the current message is still in the queue. Assumes that + * {@link #startPublishing()} has been invoked and thus {@link #token} has been + * initialized. + * + * @return {@code true} if the current message is in the queue, {@code false} + * otherwise + */ + private boolean isInQueue() { + return (token.get() == wrapper.getMessage()); + } + + /** + * Determines if this request data is still active. + * + * @return {@code true} if this request is active, {@code false} otherwise + */ + protected abstract boolean isActive(); + + /** + * Indicates that this entire request has completed. + */ + protected abstract void allCompleted(); +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java b/main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java new file mode 100644 index 00000000..9748b0b5 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/TimerManager.java @@ -0,0 +1,310 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manager of timers. All of the timers for a given manager have the same wait time, which + * makes it possible to use a linked hash map to track the timers. As a result, timers can + * be quickly added and removed. In addition, the expiration time of any new timer is + * always greater than or equal to the timers that are already in the map. Consequently, + * the map's iterator will go in ascending order from the minimum expiration time to + * maximum expiration time. + * + * <p>This class has not been tested for multiple threads invoking {@link #run()} + * simultaneously. + */ +public class TimerManager implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(TimerManager.class); + + /** + * Name of this manager, used for logging purposes. + */ + private final String name; + + /** + * Time that each new timer should wait. + */ + private final long waitTimeMs; + + /** + * When the map is empty, the timer thread will block waiting for this semaphore. When + * a new timer is added to the map, the semaphore will be released, thus allowing the + * timer thread to progress. + */ + private final Semaphore sem = new Semaphore(0); + + /** + * This is decremented to indicate that this manager should be stopped. + */ + private final CountDownLatch stopper = new CountDownLatch(1); + + /** + * Used to lock updates to the map. + */ + private final Object lockit = new Object(); + + /** + * Maps a timer name to a timer. + */ + private final Map<String, Timer> name2timer = new LinkedHashMap<>(); + + /** + * Constructs the object. + * + * @param name name of this manager, used for logging purposes + * @param waitTimeMs time that each new timer should wait + */ + public TimerManager(String name, long waitTimeMs) { + this.name = name; + this.waitTimeMs = waitTimeMs; + } + + /** + * Stops the timer thread. + */ + public void stop() { + logger.info("timer manager {} stopping", name); + + // Note: Must decrement the latch BEFORE releasing the semaphore + stopper.countDown(); + sem.release(); + } + + /** + * Registers a timer with the given name. When the timer expires, it is automatically + * unregistered and then executed. + * + * @param timerName name of the timer to register + * @param action action to take when the timer expires; the "timerName" is passed as + * the only argument + * @return the timer + */ + public Timer register(String timerName, Consumer<String> action) { + + synchronized (lockit) { + Timer timer = new Timer(timerName, action); + + // always remove existing entry so that new entry goes at the end of the map + name2timer.remove(timerName); + name2timer.put(timerName, timer); + + logger.info("{} timer registered {}", name, timer); + + if (name2timer.size() == 1) { + // release the timer thread + sem.release(); + } + + return timer; + } + } + + /** + * Continuously processes timers until {@link #stop()} is invoked. + */ + @Override + public void run() { + logger.info("timer manager {} started", name); + + while (stopper.getCount() > 0) { + + try { + sem.acquire(); + sem.drainPermits(); + + processTimers(); + + } catch (InterruptedException e) { + logger.warn("timer manager {} stopping due to interrupt", name); + stopper.countDown(); + Thread.currentThread().interrupt(); + } + } + + logger.info("timer manager {} stopped", name); + } + + /** + * Process all timers, continuously, as long as a timer remains in the map (and + * {@link #stop()} has not been called). + * + * @throws InterruptedException if the thread is interrupted + */ + private void processTimers() throws InterruptedException { + Timer timer; + while ((timer = getNextTimer()) != null && stopper.getCount() > 0) { + processTimer(timer); + } + } + + /** + * Gets the timer that will expire first. + * + * @return the timer that will expire first, or {@code null} if there are no timers + */ + private Timer getNextTimer() { + + synchronized (lockit) { + if (name2timer.isEmpty()) { + return null; + } + + // use an iterator to get the first timer in the map + return name2timer.values().iterator().next(); + } + } + + /** + * Process a timer, waiting until it expires, unregistering it, and then executing its + * action. + * + * @param timer timer to process + * @throws InterruptedException if the thread is interrupted + */ + private void processTimer(Timer timer) throws InterruptedException { + timer.await(); + + if (stopper.getCount() == 0) { + // stop() was called + return; + } + + if (!timer.cancel()) { + // timer was cancelled while we were waiting + return; + } + + + // run the timer + try { + logger.info("{} timer expired {}", TimerManager.this.name, timer); + timer.runner.accept(timer.name); + } catch (RuntimeException e) { + logger.warn("{} timer threw an exception {}", TimerManager.this.name, timer, e); + } + } + + /** + * Timer info. + */ + public class Timer { + /** + * The timer's name. + */ + private String name; + + /** + * Time, in milliseconds, when the timer will expire. + */ + private long expireMs; + + /** + * Action to take when the timer expires. + */ + private Consumer<String> runner; + + + private Timer(String name, Consumer<String> runner2) { + this.name = name; + this.expireMs = waitTimeMs + currentTimeMillis(); + this.runner = runner2; + } + + private void await() throws InterruptedException { + // wait for it to expire, if necessary + long tleft = expireMs - currentTimeMillis(); + if (tleft > 0) { + logger.info("{} timer waiting {}ms {}", TimerManager.this.name, tleft, this); + sleep(tleft); + } + } + + /** + * Cancels the timer. + * + * @return {@code true} if the timer was cancelled, {@code false} if the timer was + * not running + */ + public boolean cancel() { + + AtomicBoolean wasPresent = new AtomicBoolean(false); + + synchronized (lockit) { + + name2timer.computeIfPresent(name, (key, val) -> { + + if (val == this) { + wasPresent.set(true); + return null; + + } else { + return val; + } + }); + + if (!wasPresent.get()) { + // have a new timer in the map - ignore "this" timer + logger.info("{} timer replaced {}", TimerManager.this.name, this); + return false; + } + + logger.debug("{} timer cancelled {}", TimerManager.this.name, this); + return true; + } + } + + @Override + public String toString() { + return "Timer [name=" + name + ", expireMs=" + expireMs + "]"; + } + } + + // these may be overridden by junit tests + + /** + * Gets the current time, in milli-seconds. + * + * @return the current time, in milli-seconds + */ + protected long currentTimeMillis() { + return System.currentTimeMillis(); + } + + /** + * "Sleeps" for a bit, stopping if {@link #stop()} is invoked. + * + * @param timeMs time, in milli-seconds, to sleep + * @throws InterruptedException if this thread is interrupted while sleeping + */ + protected void sleep(long timeMs) throws InterruptedException { + stopper.await(timeMs, TimeUnit.MILLISECONDS); + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java new file mode 100644 index 00000000..aa288f7d --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/MessageData.java @@ -0,0 +1,104 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm.msgdata; + +import org.onap.policy.models.pdp.concepts.PdpMessage; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.pap.main.comm.TimerManager; + + +/** + * Wraps a message, providing methods appropriate to the message type. + */ +public abstract class MessageData { + private final PdpMessage message; + private final int maxRetries; + private final TimerManager timers; + + /** + * Constructs the object. + * + * @param message message to be wrapped by this + * @param maxRetries max number of retries + * @param timers the timer manager for messages of this type + */ + public MessageData(PdpMessage message, int maxRetries, TimerManager timers) { + this.message = message; + this.maxRetries = maxRetries; + this.timers = timers; + } + + /** + * Gets the wrapped message. + * + * @return the wrapped message + */ + public PdpMessage getMessage() { + return message; + } + + /** + * Gets a string, suitable for logging, identifying the message type. + * + * @return the message type + */ + public String getType() { + return message.getClass().getSimpleName(); + } + + /** + * Gets the maximum retry count for the particular message type. + * + * @return the maximum retry count + */ + public int getMaxRetryCount() { + return maxRetries; + } + + /** + * Gets the timer manager for the particular message type. + * + * @return the timer manager + */ + public TimerManager getTimers() { + return timers; + } + + /** + * Indicates that the response did not match what was expected. + * + * @param reason the reason for the mismatch + */ + public abstract void mismatch(String reason); + + /** + * Indicates that processing of this particular message has completed successfully. + */ + public abstract void completed(); + + /** + * Checks the response to ensure it is as expected. + * + * @param response the response to check + * @return an error message, if a fatal error has occurred, {@code null} otherwise + */ + public abstract String checkResponse(PdpStatus response); +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java new file mode 100644 index 00000000..ecbf5dfa --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/StateChangeData.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm.msgdata; + +import org.onap.policy.models.pdp.concepts.PdpStateChange; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; + +/** + * Wraps a STATE-CHANGE. + */ +public abstract class StateChangeData extends MessageData { + private PdpStateChange stateChange; + + /** + * Constructs the object. + * + * @param message message to be wrapped by this + * @param params the parameters + */ + public StateChangeData(PdpStateChange message, PdpModifyRequestMapParams params) { + super(message, params.getParams().getStateChangeParameters().getMaxRetryCount(), params.getStateChangeTimers()); + + stateChange = message; + } + + @Override + public String checkResponse(PdpStatus response) { + if (!stateChange.getName().equals(response.getName())) { + return "name does not match"; + } + + if (response.getState() != stateChange.getState()) { + return "state is " + response.getState() + ", but expected " + stateChange.getState(); + } + + return null; + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java new file mode 100644 index 00000000..904c8237 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/msgdata/UpdateData.java @@ -0,0 +1,78 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm.msgdata; + +import java.util.ArrayList; +import java.util.List; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpUpdate; +import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy; +import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; + + +/** + * Wraps an UPDATE. + */ +public abstract class UpdateData extends MessageData { + private PdpUpdate update; + + /** + * Constructs the object. + * + * @param message message to be wrapped by this + * @param params the parameters + */ + public UpdateData(PdpUpdate message, PdpModifyRequestMapParams params) { + super(message, params.getParams().getUpdateParameters().getMaxRetryCount(), params.getUpdateTimers()); + + update = message; + } + + @Override + public String checkResponse(PdpStatus response) { + if (!update.getName().equals(response.getName())) { + return "name does not match"; + } + + if (!update.getPdpGroup().equals(response.getPdpGroup())) { + return "group does not match"; + } + + if (!update.getPdpSubgroup().equals(response.getPdpSubgroup())) { + return "subgroup does not match"; + } + + // see if the other has any policies that this does not have + ArrayList<ToscaPolicy> lst = new ArrayList<>(response.getPolicies()); + List<ToscaPolicy> mypolicies = update.getPolicies(); + + if (mypolicies.size() != lst.size()) { + return "policies do not match"; + } + + lst.removeAll(update.getPolicies()); + if (!lst.isEmpty()) { + return "policies do not match"; + } + + return null; + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java new file mode 100644 index 00000000..2c17a0b2 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpModifyRequestMapParams.java @@ -0,0 +1,88 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.parameters; + +import lombok.Getter; +import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.pap.main.comm.Publisher; +import org.onap.policy.pap.main.comm.TimerManager; + + +/** + * Parameters needed to create a {@link PdpModifyRequestMapParams}. + */ +@Getter +public class PdpModifyRequestMapParams extends RequestDataParams { + private PdpParameters params; + private TimerManager updateTimers; + private TimerManager stateChangeTimers; + + public PdpModifyRequestMapParams setParams(PdpParameters params) { + this.params = params; + return this; + } + + public PdpModifyRequestMapParams setUpdateTimers(TimerManager updateTimers) { + this.updateTimers = updateTimers; + return this; + } + + public PdpModifyRequestMapParams setStateChangeTimers(TimerManager stateChangeTimers) { + this.stateChangeTimers = stateChangeTimers; + return this; + } + + @Override + public PdpModifyRequestMapParams setPublisher(Publisher publisher) { + super.setPublisher(publisher); + return this; + } + + @Override + public PdpModifyRequestMapParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) { + super.setResponseDispatcher(responseDispatcher); + return this; + } + + @Override + public PdpModifyRequestMapParams setModifyLock(Object modifyLock) { + super.setModifyLock(modifyLock); + return this; + } + + @Override + public void validate() { + super.validate(); + + if (params == null) { + throw new IllegalArgumentException("missing PDP parameters"); + } + + if (updateTimers == null) { + throw new IllegalArgumentException("missing updateTimers"); + } + + if (stateChangeTimers == null) { + throw new IllegalArgumentException("missing stateChangeTimers"); + } + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java new file mode 100644 index 00000000..ea4b02ca --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/parameters/RequestDataParams.java @@ -0,0 +1,70 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.parameters; + +import lombok.Getter; +import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.pap.main.comm.Publisher; +import org.onap.policy.pap.main.comm.RequestData; + + +/** + * Parameters needed to create a {@link RequestData}. + */ +@Getter +public class RequestDataParams { + private Publisher publisher; + private RequestIdDispatcher<PdpStatus> responseDispatcher; + private Object modifyLock; + + public RequestDataParams setPublisher(Publisher publisher) { + this.publisher = publisher; + return this; + } + + public RequestDataParams setResponseDispatcher(RequestIdDispatcher<PdpStatus> responseDispatcher) { + this.responseDispatcher = responseDispatcher; + return this; + } + + public RequestDataParams setModifyLock(Object modifyLock) { + this.modifyLock = modifyLock; + return this; + } + + /** + * Validates the parameters. + */ + public void validate() { + if (publisher == null) { + throw new IllegalArgumentException("missing publisher"); + } + + if (responseDispatcher == null) { + throw new IllegalArgumentException("missing responseDispatcher"); + } + + if (modifyLock == null) { + throw new IllegalArgumentException("missing modifyLock"); + } + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java index 913e661d..ed880aeb 100644 --- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java +++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java @@ -23,6 +23,7 @@ package org.onap.policy.pap.main.startstop; import java.util.Arrays; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; @@ -34,7 +35,12 @@ import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.models.pdp.enums.PdpMessageType; import org.onap.policy.pap.main.PapConstants; import org.onap.policy.pap.main.PolicyPapRuntimeException; +import org.onap.policy.pap.main.comm.PdpModifyRequestMap; +import org.onap.policy.pap.main.comm.Publisher; +import org.onap.policy.pap.main.comm.TimerManager; import org.onap.policy.pap.main.parameters.PapParameterGroup; +import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; +import org.onap.policy.pap.main.parameters.PdpParameters; import org.onap.policy.pap.main.rest.PapRestServer; import org.onap.policy.pap.main.rest.PapStatisticsManager; @@ -81,8 +87,6 @@ public class PapActivator extends ServiceManagerContainer { try { this.papParameterGroup = papParameterGroup; - papParameterGroup.getRestServerParameters().setName(papParameterGroup.getName()); - this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); @@ -90,18 +94,26 @@ public class PapActivator extends ServiceManagerContainer { throw new PolicyPapRuntimeException(e); } - this.msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher); + papParameterGroup.getRestServerParameters().setName(papParameterGroup.getName()); final Object pdpUpdateLock = new Object(); + PdpParameters pdpParams = papParameterGroup.getPdpParameters(); + AtomicReference<Publisher> pdpPub = new AtomicReference<>(); + AtomicReference<TimerManager> pdpUpdTimers = new AtomicReference<>(); + AtomicReference<TimerManager> pdpStChgTimers = new AtomicReference<>(); // @formatter:off addAction("PAP parameters", () -> ParameterService.register(papParameterGroup), () -> ParameterService.deregister(papParameterGroup.getName())); - addAction("dispatcher", - () -> registerDispatcher(), - () -> unregisterDispatcher()); + addAction("Request ID Dispatcher", + () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher), + () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); + + addAction("Message Dispatcher", + () -> registerMsgDispatcher(), + () -> unregisterMsgDispatcher()); addAction("topics", () -> TopicEndpoint.manager.start(), @@ -111,21 +123,69 @@ public class PapActivator extends ServiceManagerContainer { () -> Registry.register(PapConstants.REG_STATISTICS_MANAGER, new PapStatisticsManager()), () -> Registry.unregister(PapConstants.REG_STATISTICS_MANAGER)); + addAction("PDP publisher", + () -> { + pdpPub.set(new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP)); + startThread(pdpPub.get()); + }, + () -> pdpPub.get().stop()); + + addAction("PDP update timers", + () -> { + pdpUpdTimers.set(new TimerManager("update", pdpParams.getUpdateParameters().getMaxWaitMs())); + startThread(pdpUpdTimers.get()); + }, + () -> pdpUpdTimers.get().stop()); + + addAction("PDP state-change timers", + () -> { + pdpStChgTimers.set(new TimerManager("state-change", pdpParams.getUpdateParameters().getMaxWaitMs())); + startThread(pdpStChgTimers.get()); + }, + () -> pdpStChgTimers.get().stop()); + addAction("PDP modification lock", () -> Registry.register(PapConstants.REG_PDP_MODIFY_LOCK, pdpUpdateLock), () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_LOCK)); - addAction("REST server", - () -> restServer = new PapRestServer(papParameterGroup.getRestServerParameters()), - () -> { }); + addAction("PDP modification requests", + () -> Registry.register(PapConstants.REG_PDP_MODIFY_MAP, new PdpModifyRequestMap( + new PdpModifyRequestMapParams() + .setModifyLock(pdpUpdateLock) + .setParams(pdpParams) + .setPublisher(pdpPub.get()) + .setResponseDispatcher(reqIdDispatcher) + .setStateChangeTimers(pdpStChgTimers.get()) + .setUpdateTimers(pdpUpdTimers.get()))), + () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_MAP)); + + addAction("Create REST server", + () -> { + restServer = new PapRestServer(papParameterGroup.getRestServerParameters()); + }, + () -> { + restServer = null; + }); - addAction("REST server thread", + addAction("REST server", () -> restServer.start(), () -> restServer.stop()); // @formatter:on } /** + * Starts a background thread. + * + * @param runner function to run in the background + */ + private void startThread(Runnable runner) { + Thread thread = new Thread(runner); + thread.setDaemon(true); + + thread.start(); + } + + /** * Get the parameters used by the activator. * * @return the parameters of the activator @@ -137,7 +197,7 @@ public class PapActivator extends ServiceManagerContainer { /** * Registers the dispatcher with the topic source(s). */ - private void registerDispatcher() { + private void registerMsgDispatcher() { for (TopicSource source : TopicEndpoint.manager .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) { source.register(msgDispatcher); @@ -147,7 +207,7 @@ public class PapActivator extends ServiceManagerContainer { /** * Unregisters the dispatcher from the topic source(s). */ - private void unregisterDispatcher() { + private void unregisterMsgDispatcher() { for (TopicSource source : TopicEndpoint.manager .getTopicSources(Arrays.asList(PapConstants.TOPIC_POLICY_PDP_PAP))) { source.unregister(msgDispatcher); diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/PdpModifyRequestMapTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/PdpModifyRequestMapTest.java new file mode 100644 index 00000000..bbe75a44 --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/PdpModifyRequestMapTest.java @@ -0,0 +1,575 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.function.Consumer; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; +import org.onap.policy.common.endpoints.listeners.TypedMessageListener; +import org.onap.policy.models.base.PfConceptKey; +import org.onap.policy.models.pdp.concepts.PdpMessage; +import org.onap.policy.models.pdp.concepts.PdpStateChange; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpUpdate; +import org.onap.policy.models.pdp.enums.PdpState; +import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy; +import org.onap.policy.pap.main.PapConstants; +import org.onap.policy.pap.main.comm.PdpModifyRequestMap.ModifyReqData; +import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; +import org.onap.policy.pap.main.parameters.PdpParameters; +import org.onap.policy.pap.main.parameters.PdpStateChangeParameters; +import org.onap.policy.pap.main.parameters.PdpUpdateParameters; +import org.powermock.reflect.Whitebox; + +public class PdpModifyRequestMapTest { + private static final String DIFFERENT = "-diff"; + private static final String PDP1 = "pdp_1"; + + private static final int UPDATE_RETRIES = 2; + private static final int STATE_RETRIES = 1; + + private PdpModifyRequestMap map; + private Publisher pub; + private RequestIdDispatcher<PdpStatus> disp; + private Object lock; + private TimerManager updTimers; + private TimerManager stateTimers; + private TimerManager.Timer timer; + private Queue<QueueToken<PdpMessage>> queue; + private PdpStatus response; + private PdpParameters pdpParams; + private PdpUpdateParameters updParams; + private PdpStateChangeParameters stateParams; + private PdpUpdate update; + private PdpStateChange state; + private String mismatchReason; + + /** + * Sets up. + */ + @Before + @SuppressWarnings("unchecked") + public void setUp() { + pub = mock(Publisher.class); + disp = mock(RequestIdDispatcher.class); + lock = new Object(); + updTimers = mock(TimerManager.class); + stateTimers = mock(TimerManager.class); + timer = mock(TimerManager.Timer.class); + queue = new LinkedList<>(); + response = new PdpStatus(); + pdpParams = mock(PdpParameters.class); + updParams = mock(PdpUpdateParameters.class); + stateParams = mock(PdpStateChangeParameters.class); + update = makeUpdate(); + state = makeStateChange(); + mismatchReason = null; + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + queue.add(invocation.getArgumentAt(0, QueueToken.class)); + return null; + } + }).when(pub).enqueue(any()); + + when(updTimers.register(any(), any())).thenReturn(timer); + when(stateTimers.register(any(), any())).thenReturn(timer); + + when(pdpParams.getUpdateParameters()).thenReturn(updParams); + when(pdpParams.getStateChangeParameters()).thenReturn(stateParams); + + when(updParams.getMaxRetryCount()).thenReturn(UPDATE_RETRIES); + when(updParams.getMaxWaitMs()).thenReturn(1000L); + + when(stateParams.getMaxRetryCount()).thenReturn(STATE_RETRIES); + when(stateParams.getMaxWaitMs()).thenReturn(1000L); + + response.setName(PDP1); + response.setState(PdpState.SAFE); + response.setPdpGroup(update.getPdpGroup()); + response.setPdpSubgroup(update.getPdpSubgroup()); + response.setPolicies(update.getPolicies()); + + map = new PdpModifyRequestMap(makeParameters()) { + + @Override + protected ModifyReqData makeRequestData(PdpUpdate update, PdpStateChange stateChange) { + return new ModifyReqData(update, stateChange) { + @Override + protected void mismatch(String reason) { + mismatchReason = reason; + super.mismatch(reason); + } + }; + } + }; + + map = spy(map); + } + + @Test + public void testAdd_DifferentPdps() { + map.addRequest(update); + + state.setName(DIFFERENT); + map.addRequest(state); + + assertNotNull(getReqData(PDP1)); + assertNotNull(getReqData(DIFFERENT)); + + assertQueueContains("testAdd_DifferentPdps", update, state); + } + + @Test + public void testAddRequestPdpUpdate() { + map.addRequest(update); + + assertQueueContains("testAddRequestPdpUpdate", update); + } + + @Test + public void testAddRequestPdpStateChange() { + map.addRequest(state); + + assertQueueContains("testAddRequestPdpStateChange", state); + } + + @Test + public void testAddRequestPdpUpdatePdpStateChange_Both() { + map.addRequest(update, state); + + assertQueueContains("testAddRequestPdpUpdatePdpStateChange_Both", update); + } + + @Test + public void testAddRequestPdpUpdatePdpStateChange_BothNull() { + map.addRequest(null, null); + + // nothing should have been added to the queue + assertTrue(queue.isEmpty()); + } + + @Test + public void testGetPdpName_SameNames() { + // should be no exception + map.addRequest(update, state); + } + + @Test + public void testGetPdpName_DifferentNames() { + // should be no exception + state.setName(update.getName() + "X"); + assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state)) + .withMessageContaining("does not match"); + } + + @Test + public void testGetPdpName_NullUpdateName() { + update.setName(null); + assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update)).withMessageContaining("update"); + + assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state)) + .withMessageContaining("update"); + + // both names are null + state.setName(null); + assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state)); + } + + @Test + public void testGetPdpName_NullStateName() { + state.setName(null); + assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(state)).withMessageContaining("state"); + + assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state)) + .withMessageContaining("state"); + + // both names are null + update.setName(null); + assertThatIllegalArgumentException().isThrownBy(() -> map.addRequest(update, state)); + } + + @Test + public void testIsSamePdpUpdatePdpUpdate() { + map.addRequest(update); + + // queue a similar request + PdpUpdate update2 = makeUpdate(); + map.addRequest(update2); + + // token should still have original message + assertQueueContains("testIsSamePdpUpdatePdpUpdate", update); + } + + @Test + public void testIsSamePdpUpdatePdpUpdate_DifferentPolicyCount() { + map.addRequest(update); + + PdpUpdate update2 = makeUpdate(); + update2.setPolicies(Arrays.asList(update.getPolicies().get(0))); + map.addRequest(update2); + + // should have replaced the message in the token + assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentPolicyCount", update2); + } + + @Test + public void testIsSamePdpUpdatePdpUpdate_DifferentGroup() { + map.addRequest(update); + + // queue a similar request + PdpUpdate update2 = makeUpdate(); + update2.setPdpGroup(update.getPdpGroup() + DIFFERENT); + map.addRequest(update2); + + // should have replaced the message in the token + assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentGroup", update2); + } + + @Test + public void testIsSamePdpUpdatePdpUpdate_DifferentSubGroup() { + map.addRequest(update); + + PdpUpdate update2 = makeUpdate(); + update2.setPdpSubgroup(update.getPdpSubgroup() + DIFFERENT); + map.addRequest(update2); + + // should have replaced the message in the token + assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentSubGroup", update2); + } + + @Test + public void testIsSamePdpUpdatePdpUpdate_DifferentPolicies() { + map.addRequest(update); + + ArrayList<ToscaPolicy> policies = new ArrayList<>(update.getPolicies()); + policies.set(0, new ToscaPolicy(new PfConceptKey("policy-3-x", "2.0.0"))); + + PdpUpdate update2 = makeUpdate(); + update2.setPolicies(policies); + map.addRequest(update2); + + // should have replaced the message in the token + assertQueueContains("testIsSamePdpUpdatePdpUpdate_DifferentPolicies", update2); + } + + @Test + public void testIsSamePdpStateChangePdpStateChange() { + map.addRequest(state); + + // queue a similar request + PdpStateChange state2 = makeStateChange(); + map.addRequest(state2); + + // token should still have original message + assertQueueContains("testIsSamePdpStateChangePdpStateChange", state); + } + + @Test + public void testIsSamePdpStateChangePdpStateChange_DifferentState() { + map.addRequest(state); + + // queue a similar request + PdpStateChange state2 = makeStateChange(); + state2.setState(PdpState.TERMINATED); + map.addRequest(state2); + + // should have replaced the message in the token + assertQueueContains("testIsSamePdpStateChangePdpStateChange_DifferentState", state2); + } + + @Test + public void testModifyReqDataIsActive() { + map.addRequest(update); + + invokeProcessResponse(); + + // name should have been removed + assertNull(getReqData(PDP1)); + } + + @Test + public void testModifyReqDataAddPdpUpdate() { + map.addRequest(state); + + map.addRequest(update); + + // update should have replaced the state-change in the queue + assertQueueContains("testModifyReqDataAddPdpUpdate", update); + } + + @Test + public void testModifyReqDataAddPdpStateChange() { + map.addRequest(update); + + map.addRequest(state); + + // update should still be in the queue + assertQueueContains("testModifyReqDataAddPdpStateChange", update); + } + + @Test + public void testModifyReqDataRetryCountExhausted() { + map.addRequest(state); + + // timeout twice so that retry count is exhausted + invokeTimeoutHandler(stateTimers, STATE_RETRIES + 1); + + // name should have been removed + assertNull(getReqData(PDP1)); + } + + @Test + public void testModifyReqDataMismatch() { + map.addRequest(state); + + // set up a response with incorrect info + response.setName(state.getName() + DIFFERENT); + + invokeProcessResponse(); + + assertNotNull(mismatchReason); + + // name should have been removed + assertNull(getReqData(PDP1)); + } + + @Test + public void testUpdateDataGetMaxRetryCount() { + map.addRequest(update); + ModifyReqData reqdata = getReqData(PDP1); + + for (int count = 0; count < UPDATE_RETRIES; ++count) { + assertTrue("update bump " + count, reqdata.bumpRetryCount()); + } + + assertFalse("update bump final", reqdata.bumpRetryCount()); + } + + @Test + public void testUpdateDataMismatch() { + map.addRequest(update); + + response.setName(DIFFERENT); + invokeProcessResponse(); + + assertNull(getReqData(PDP1)); + } + + @Test + public void testUpdateDataComplete() { + map.addRequest(update); + + invokeProcessResponse(); + + assertNull(getReqData(PDP1)); + } + + @Test + public void testUpdateDataComplete_MoreToGo() { + map.addRequest(update, state); + + invokeProcessResponse(); + + assertNotNull(getReqData(PDP1)); + + assertSame(state, queue.poll().get()); + } + + @Test + public void testStateChangeDataMismatch() { + map.addRequest(state); + + response.setName(DIFFERENT); + invokeProcessResponse(); + + assertNull(getReqData(PDP1)); + } + + @Test + public void testStateChangeDataCompleted() { + map.addRequest(state); + + invokeProcessResponse(); + + assertNull(getReqData(PDP1)); + } + + @Test + public void testMakeRequestData() { + // need a map that doesn't override the method + map = new PdpModifyRequestMap(makeParameters()); + + // this will invoke makeRequestData() - should not throw an exception + map.addRequest(update); + + assertNotNull(getReqData(PDP1)); + } + + /** + * Asserts that the queue contains the specified messages. + * + * @param testName the test name + * @param messages messages that are expected in the queue + */ + private void assertQueueContains(String testName, PdpMessage... messages) { + assertEquals(testName, messages.length, queue.size()); + + int count = 0; + for (PdpMessage msg : messages) { + ++count; + + QueueToken<PdpMessage> token = queue.remove(); + assertSame(testName + "-" + count, msg, token.get()); + } + } + + /** + * Makes parameters to configure a map. + * + * @return new parameters + */ + private PdpModifyRequestMapParams makeParameters() { + return new PdpModifyRequestMapParams().setModifyLock(lock).setParams(pdpParams).setPublisher(pub) + .setResponseDispatcher(disp).setStateChangeTimers(stateTimers).setUpdateTimers(updTimers); + } + + /** + * Gets the listener that was registered with the dispatcher and invokes it. + * + * @return the response processor + */ + @SuppressWarnings("unchecked") + private TypedMessageListener<PdpStatus> invokeProcessResponse() { + @SuppressWarnings("rawtypes") + ArgumentCaptor<TypedMessageListener> processResp = ArgumentCaptor.forClass(TypedMessageListener.class); + + // indicate that is has been published + queue.remove().replaceItem(null); + + verify(disp).register(any(), processResp.capture()); + + TypedMessageListener<PdpStatus> func = processResp.getValue(); + func.onTopicEvent(CommInfrastructure.NOOP, PapConstants.TOPIC_POLICY_PDP_PAP, response); + + return func; + } + + /** + * Gets the timeout handler that was registered with the timer manager and invokes it. + * + * @param timers the timer manager whose handler is to be invoked + * @param ntimes number of times to invoke the timeout handler + * @return the timeout handler + */ + @SuppressWarnings("unchecked") + private void invokeTimeoutHandler(TimerManager timers, int ntimes) { + @SuppressWarnings("rawtypes") + ArgumentCaptor<Consumer> timeoutHdlr = ArgumentCaptor.forClass(Consumer.class); + + for (int count = 1; count <= ntimes; ++count) { + // indicate that is has been published + queue.remove().replaceItem(null); + + verify(timers, times(count)).register(any(), timeoutHdlr.capture()); + + @SuppressWarnings("rawtypes") + List<Consumer> lst = timeoutHdlr.getAllValues(); + + Consumer<String> hdlr = lst.get(lst.size() - 1); + hdlr.accept(PDP1); + } + } + + /** + * Gets the request data from the map. + * + * @param pdpName name of the PDP whose data is desired + * @return the request data, or {@code null} if the PDP is not in the map + */ + private ModifyReqData getReqData(String pdpName) { + Map<String, ModifyReqData> name2data = Whitebox.getInternalState(map, "name2data"); + return name2data.get(pdpName); + } + + /** + * Makes an update message. + * + * @return a new update message + */ + private PdpUpdate makeUpdate() { + PdpUpdate upd = new PdpUpdate(); + + upd.setDescription("update-description"); + upd.setName(PDP1); + upd.setPdpGroup("group1-a"); + upd.setPdpSubgroup("sub1-a"); + upd.setPdpType("drools"); + + ToscaPolicy policy1 = new ToscaPolicy(new PfConceptKey("policy-1-a", "1.0.0")); + ToscaPolicy policy2 = new ToscaPolicy(new PfConceptKey("policy-2-a", "1.1.0")); + + upd.setPolicies(Arrays.asList(policy1, policy2)); + + return upd; + } + + /** + * Makes a state-change message. + * + * @return a new state-change message + */ + private PdpStateChange makeStateChange() { + PdpStateChange cng = new PdpStateChange(); + + cng.setName(PDP1); + cng.setState(PdpState.SAFE); + + return cng; + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java new file mode 100644 index 00000000..f15b2a04 --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/PublisherTest.java @@ -0,0 +1,265 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; +import org.onap.policy.common.endpoints.event.comm.TopicListener; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.resources.ResourceUtils; +import org.onap.policy.models.pdp.concepts.PdpMessage; +import org.onap.policy.models.pdp.concepts.PdpStateChange; +import org.onap.policy.pap.main.PapConstants; +import org.onap.policy.pap.main.PolicyPapException; + +public class PublisherTest extends Threaded { + + // these messages will have different request IDs + private static final PdpStateChange MSG1 = new PdpStateChange(); + private static final PdpStateChange MSG2 = new PdpStateChange(); + + // MSG1 & MSG2, respectively, encoded as JSON + private static final String JSON1; + private static final String JSON2; + + static { + try { + Coder coder = new StandardCoder(); + JSON1 = coder.encode(MSG1); + JSON2 = coder.encode(MSG2); + + } catch (CoderException e) { + throw new ExceptionInInitializerError(e); + } + } + + /** + * Max time to wait, in milliseconds, for a thread to terminate or for a message to be + * published. + */ + private static final long MAX_WAIT_MS = 5000; + + private Publisher pub; + private MyListener listener; + + /** + * Configures the topic and attaches a listener. + * + * @throws Exception if an error occurs + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Properties props = new Properties(); + File propFile = new File(ResourceUtils.getFilePath4Resource("parameters/topic.properties")); + try (FileInputStream inp = new FileInputStream(propFile)) { + props.load(inp); + } + + TopicEndpoint.manager.shutdown(); + + TopicEndpoint.manager.addTopics(props); + TopicEndpoint.manager.start(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TopicEndpoint.manager.shutdown(); + } + + /** + * Set up. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + super.setUp(); + + pub = new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP); + + listener = new MyListener(); + TopicEndpoint.manager.getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).register(listener); + } + + /** + * Tear down. + * + * @throws Exception if an error occurs + */ + @After + public void tearDown() throws Exception { + TopicEndpoint.manager.getNoopTopicSink(PapConstants.TOPIC_POLICY_PDP_PAP).unregister(listener); + + super.tearDown(); + } + + @Override + protected void stopThread() { + if (pub != null) { + pub.stop(); + } + } + + @Test + public void testPublisher_testStop() throws Exception { + startThread(pub); + pub.stop(); + + assertTrue(waitStop()); + + // ensure we can call "stop" a second time + pub.stop(); + } + + @Test + public void testPublisher_Ex() throws Exception { + assertThatThrownBy(() -> new Publisher("unknwon-topic")).isInstanceOf(PolicyPapException.class); + } + + @Test + public void testEnqueue() throws Exception { + // enqueue before running + pub.enqueue(new QueueToken<>(MSG1)); + + // enqueue another after running + startThread(pub); + pub.enqueue(new QueueToken<>(MSG2)); + + String json = listener.await(MAX_WAIT_MS); + assertEquals(JSON1, json); + + json = listener.await(MAX_WAIT_MS); + assertEquals(JSON2, json); + } + + @Test + public void testRun_StopBeforeProcess() throws Exception { + // enqueue before running + QueueToken<PdpMessage> token = new QueueToken<>(MSG1); + pub.enqueue(token); + + // stop before running + pub.stop(); + + // start the thread and then wait for it to stop + startThread(pub); + assertTrue(waitStop()); + + // message should not have been processed + assertTrue(listener.isEmpty()); + assertNotNull(token.get()); + } + + @Test + public void testRun() throws Exception { + startThread(pub); + + // should skip token with null message + QueueToken<PdpMessage> token1 = new QueueToken<>(null); + pub.enqueue(token1); + + QueueToken<PdpMessage> token2 = new QueueToken<>(MSG2); + pub.enqueue(token2); + + // only the second message should have been processed + String json = listener.await(MAX_WAIT_MS); + assertEquals(JSON2, json); + assertNull(token2.get()); + + pub.stop(); + assertTrue(waitStop()); + + // no more messages + assertTrue(listener.isEmpty()); + } + + @Test + public void testGetNext() throws Exception { + startThread(pub); + + // wait for a message to be processed + pub.enqueue(new QueueToken<>(MSG1)); + assertNotNull(listener.await(MAX_WAIT_MS)); + + // now interrupt + interruptThread(); + + assertTrue(waitStop()); + } + + /** + * Listener for messages published to the topic. + */ + private static class MyListener implements TopicListener { + + /** + * Released every time a message is added to the queue. + */ + private final Semaphore sem = new Semaphore(0); + + private final ConcurrentLinkedQueue<String> messages = new ConcurrentLinkedQueue<>(); + + public boolean isEmpty() { + return messages.isEmpty(); + } + + /** + * Waits for a message to be published to the topic. + * + * @param waitMs time to wait, in milli-seconds + * @return the next message in the queue, or {@code null} if there are no messages + * or if the timeout was reached + * @throws InterruptedException if this thread was interrupted while waiting + */ + public String await(long waitMs) throws InterruptedException { + if (sem.tryAcquire(waitMs, TimeUnit.MILLISECONDS)) { + return messages.poll(); + } + + return null; + } + + @Override + public void onTopicEvent(CommInfrastructure commType, String topic, String event) { + messages.add(event); + sem.release(); + } + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/QueueTokenTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/QueueTokenTest.java new file mode 100644 index 00000000..3ff91edf --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/QueueTokenTest.java @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.junit.Test; + +public class QueueTokenTest { + private static final String STRING1 = "a string"; + private static final String STRING2 = "another string"; + + private QueueToken<String> token; + + @Test + public void test() throws Exception { + token = new QueueToken<>(STRING1); + assertEquals(STRING1, token.get()); + + assertEquals(STRING1, token.replaceItem(STRING2)); + assertEquals(STRING2, token.get()); + + assertEquals(STRING2, token.replaceItem(null)); + assertEquals(null, token.get()); + + assertEquals(null, token.replaceItem(null)); + assertEquals(null, token.get()); + + assertEquals(null, token.replaceItem(STRING1)); + assertEquals(null, token.get()); + + /* + * Now do some mult-threaded tests, hopefully causing some contention. + */ + + token = new QueueToken<>(""); + + Set<String> values = ConcurrentHashMap.newKeySet(); + + // create and configure the threads + Thread[] threads = new Thread[100]; + for (int x = 0; x < threads.length; ++x) { + final int xfinal = x; + threads[x] = new Thread(() -> values.add(token.replaceItem("me-" + xfinal))); + threads[x].setDaemon(true); + } + + // start the threads all at once + for (Thread thread : threads) { + thread.start(); + } + + // wait for the threads to stop + for (Thread thread : threads) { + thread.join(5000); + } + + values.add(token.replaceItem(null)); + + for (int x = 0; x < threads.length; ++x) { + String msg = "me-" + x; + assertTrue(msg, values.contains(msg)); + } + } + +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/RequestDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/RequestDataTest.java new file mode 100644 index 00000000..28e5cf96 --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/RequestDataTest.java @@ -0,0 +1,476 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.function.Consumer; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; +import org.onap.policy.common.endpoints.listeners.TypedMessageListener; +import org.onap.policy.common.utils.services.ServiceManager; +import org.onap.policy.models.pdp.concepts.PdpMessage; +import org.onap.policy.models.pdp.concepts.PdpStateChange; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.enums.PdpState; +import org.onap.policy.pap.main.PapConstants; +import org.onap.policy.pap.main.comm.msgdata.MessageData; +import org.onap.policy.pap.main.parameters.RequestDataParams; +import org.powermock.reflect.Whitebox; + +public class RequestDataTest { + private static final String PDP1 = "pdp_1"; + private static final String MY_MSG_TYPE = "my-type"; + + private MyRequestData reqdata; + private Publisher pub; + private RequestIdDispatcher<PdpStatus> disp; + private Object lock; + private TimerManager timers; + private TimerManager.Timer timer; + private MyMessageData msgdata; + private Queue<QueueToken<PdpMessage>> queue; + private PdpStatus response; + + /** + * Sets up. + */ + @Before + @SuppressWarnings("unchecked") + public void setUp() { + pub = mock(Publisher.class); + disp = mock(RequestIdDispatcher.class); + lock = new Object(); + timers = mock(TimerManager.class); + timer = mock(TimerManager.Timer.class); + msgdata = new MyMessageData(PDP1); + queue = new LinkedList<>(); + response = new PdpStatus(); + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + queue.add(invocation.getArgumentAt(0, QueueToken.class)); + return null; + } + }).when(pub).enqueue(any()); + + when(timers.register(any(), any())).thenReturn(timer); + + reqdata = new MyRequestData( + new RequestDataParams().setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp)); + + reqdata.setName(PDP1); + + msgdata = spy(msgdata); + reqdata = spy(reqdata); + } + + @Test + public void testRequestData_Invalid() { + // null params + assertThatThrownBy(() -> new MyRequestData(null)).isInstanceOf(NullPointerException.class); + + // invalid params + assertThatIllegalArgumentException().isThrownBy(() -> new MyRequestData(new RequestDataParams())); + } + + @Test + public void testStartPublishing() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + verify(disp).register(eq(msgdata.getMessage().getRequestId()), any()); + verify(timers).register(eq(PDP1), any()); + verify(pub).enqueue(any()); + + QueueToken<PdpMessage> token = queue.poll(); + assertNotNull(token); + assertSame(msgdata.getMessage(), token.get()); + + + // invoking start() again has no effect - invocation counts remain the same + reqdata.startPublishing(); + verify(disp, times(1)).register(eq(msgdata.getMessage().getRequestId()), any()); + verify(timers, times(1)).register(eq(PDP1), any()); + verify(pub, times(1)).enqueue(any()); + } + + @Test + public void testStopPublishing() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + reqdata.stopPublishing(); + + verify(disp).unregister(msgdata.getMessage().getRequestId()); + verify(timer).cancel(); + + + // invoking stop() again has no effect - invocation counts remain the same + reqdata.stopPublishing(); + + verify(disp, times(1)).unregister(msgdata.getMessage().getRequestId()); + verify(timer, times(1)).cancel(); + } + + @Test + public void testConfigure() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + verify(disp).register(eq(msgdata.getMessage().getRequestId()), any()); + verify(timers).register(eq(PDP1), any()); + verify(pub).enqueue(any()); + + ServiceManager svcmgr = Whitebox.getInternalState(reqdata, "svcmgr"); + assertEquals(PDP1 + " " + MY_MSG_TYPE, svcmgr.getName()); + + + // bump this so we can verify that it is reset by configure() + reqdata.bumpRetryCount(); + + reqdata.configure(msgdata); + assertEquals(0, getRetryCount()); + } + + @Test + public void testEnqueue() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + // replace the message with a new message + reqdata.stopPublishing(); + MyMessageData msgdata2 = new MyMessageData(PDP1); + reqdata.configure(msgdata2); + reqdata.startPublishing(); + + // should still only be one token in the queue + QueueToken<PdpMessage> token = queue.poll(); + assertNull(queue.poll()); + assertNotNull(token); + assertSame(msgdata2.getMessage(), token.get()); + + // null out the token + token.replaceItem(null); + + // enqueue a new message + reqdata.stopPublishing(); + MyMessageData msgdata3 = new MyMessageData(PDP1); + reqdata.configure(msgdata3); + reqdata.startPublishing(); + + // a new token should have been placed in the queue + QueueToken<PdpMessage> token2 = queue.poll(); + assertTrue(token != token2); + assertNull(queue.poll()); + assertNotNull(token2); + assertSame(msgdata3.getMessage(), token2.get()); + } + + @Test + public void testResetRetryCount_testBumpRetryCount() { + when(msgdata.getMaxRetryCount()).thenReturn(2); + + reqdata.configure(msgdata); + + assertEquals(0, getRetryCount()); + assertTrue(reqdata.bumpRetryCount()); + assertTrue(reqdata.bumpRetryCount()); + + // limit should now be reached and it should go no further + assertFalse(reqdata.bumpRetryCount()); + assertFalse(reqdata.bumpRetryCount()); + + assertEquals(2, getRetryCount()); + + reqdata.resetRetryCount(); + assertEquals(0, getRetryCount()); + } + + @Test + public void testRetryCountExhausted() { + reqdata.configure(msgdata); + + reqdata.retryCountExhausted(); + + verify(reqdata).allCompleted(); + } + + @Test + public void testProcessResponse() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + invokeProcessResponse(); + + verify(reqdata).stopPublishing(); + verify(msgdata).checkResponse(response); + verify(msgdata).completed(); + } + + @Test + public void testProcessResponse_NotPublishing() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + reqdata.stopPublishing(); + + invokeProcessResponse(); + + // only invocation should have been the one before calling invokeProcessResponse() + verify(reqdata, times(1)).stopPublishing(); + + verify(msgdata, never()).checkResponse(response); + verify(msgdata, never()).completed(); + } + + @Test + public void testProcessResponse_NotActive() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + when(reqdata.isActive()).thenReturn(false); + + invokeProcessResponse(); + + // it should still stop publishing + verify(reqdata).stopPublishing(); + + verify(msgdata, never()).checkResponse(response); + verify(msgdata, never()).completed(); + } + + @Test + public void testProcessResponse_ResponseFailed() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + when(msgdata.checkResponse(response)).thenReturn("failed"); + + invokeProcessResponse(); + + verify(reqdata).stopPublishing(); + verify(msgdata).checkResponse(response); + + verify(msgdata, never()).completed(); + verify(msgdata).mismatch("failed"); + } + + @Test + public void testHandleTimeout() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + // remove it from the queue + queue.poll().replaceItem(null); + + invokeTimeoutHandler(); + + // count should have been bumped + assertEquals(1, getRetryCount()); + + // should have invoked startPublishing() a second time + verify(reqdata, times(2)).startPublishing(); + } + + @Test + public void testHandleTimeout_NotPublishing() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + reqdata.stopPublishing(); + + invokeTimeoutHandler(); + + // should NOT have invoked startPublishing() a second time + verify(reqdata, times(1)).startPublishing(); + verify(reqdata, never()).retryCountExhausted(); + } + + @Test + public void testHandleTimeout_NotActive() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + when(reqdata.isActive()).thenReturn(false); + + invokeTimeoutHandler(); + + // should NOT have invoked startPublishing() a second time + verify(reqdata, times(1)).startPublishing(); + verify(reqdata, never()).retryCountExhausted(); + } + + @Test + public void testHandleTimeout_StillInQueue() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + reqdata.bumpRetryCount(); + + invokeTimeoutHandler(); + + // count should reset the count + assertEquals(0, getRetryCount()); + + // should have invoked startPublishing() a second time + verify(reqdata, times(2)).startPublishing(); + } + + @Test + public void testHandleTimeout_RetryExhausted() { + reqdata.configure(msgdata); + reqdata.startPublishing(); + + // exhaust the count + reqdata.bumpRetryCount(); + reqdata.bumpRetryCount(); + reqdata.bumpRetryCount(); + + // remove it from the queue + queue.poll().replaceItem(null); + + invokeTimeoutHandler(); + + // should NOT have invoked startPublishing() a second time + verify(reqdata, times(1)).startPublishing(); + + verify(reqdata).retryCountExhausted(); + } + + @Test + public void testGetName_testSetName() { + reqdata.setName("abc"); + assertEquals("abc", reqdata.getName()); + } + + @Test + public void testGetWrapper() { + reqdata.configure(msgdata); + assertSame(msgdata, reqdata.getWrapper()); + } + + /** + * Gets the retry count from the data. + * @return the current retry count + */ + private int getRetryCount() { + return Whitebox.getInternalState(reqdata, "retryCount"); + } + + /** + * Gets the listener that was registered with the dispatcher and invokes it. + */ + @SuppressWarnings("unchecked") + private void invokeProcessResponse() { + @SuppressWarnings("rawtypes") + ArgumentCaptor<TypedMessageListener> processResp = ArgumentCaptor.forClass(TypedMessageListener.class); + + verify(disp).register(any(), processResp.capture()); + + processResp.getValue().onTopicEvent(CommInfrastructure.NOOP, PapConstants.TOPIC_POLICY_PDP_PAP, response); + } + + /** + * Gets the timeout handler that was registered with the timer manager and invokes it. + */ + @SuppressWarnings("unchecked") + private void invokeTimeoutHandler() { + @SuppressWarnings("rawtypes") + ArgumentCaptor<Consumer> timeoutHdlr = ArgumentCaptor.forClass(Consumer.class); + + verify(timers).register(any(), timeoutHdlr.capture()); + + timeoutHdlr.getValue().accept(PDP1); + } + + private class MyRequestData extends RequestData { + + public MyRequestData(RequestDataParams params) { + super(params); + } + + @Override + protected boolean isActive() { + return true; + } + + @Override + protected void allCompleted() { + // do nothing + } + } + + private class MyMessageData extends MessageData { + + public MyMessageData(String pdpName) { + super(new PdpStateChange(), 1, timers); + + PdpStateChange msg = (PdpStateChange) getMessage(); + msg.setName(pdpName); + msg.setState(PdpState.ACTIVE); + } + + @Override + public String getType() { + return MY_MSG_TYPE; + } + + @Override + public void mismatch(String reason) { + // do nothing + } + + @Override + public void completed() { + // do nothing + } + + @Override + public String checkResponse(PdpStatus response) { + // always valid - return null + return null; + } + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/Threaded.java b/main/src/test/java/org/onap/policy/pap/main/comm/Threaded.java new file mode 100644 index 00000000..d6a0d1f1 --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/Threaded.java @@ -0,0 +1,112 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import org.junit.After; +import org.junit.Before; + +/** + * Super class for tests that run a background thread. + */ +public abstract class Threaded { + + /** + * Max time to wait, in milliseconds, for a thread to terminate or for a message to be + * published. + */ + public static final long MAX_WAIT_MS = 5000; + + /** + * The current background thread. + */ + private Thread thread; + + /** + * Indicates that a test is about to begin. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + thread = null; + } + + /** + * Invokes the "stopper" function to tell the background thread to exit and then waits + * for it to terminate. + * + * @throws Exception if an error occurs + */ + @After + public void tearDown() throws Exception { + stopThread(); + waitStop(); + } + + /** + * Signals the background thread to stop. + * + * @throws Exception if an error occurs + */ + protected abstract void stopThread() throws Exception; + + /** + * Starts a background thread. + * + * @param runner what should be executed in the background thread + * @throws IllegalStateException if a background thread is already running + */ + protected void startThread(Runnable runner) { + if (thread != null) { + throw new IllegalStateException("a background thread is already running"); + } + + thread = new Thread(runner); + thread.setDaemon(true); + thread.start(); + } + + /** + * Interrupts the background thread. + */ + protected void interruptThread() { + thread.interrupt(); + } + + /** + * Waits for the background thread to stop. + * + * @return {@code true} if the thread has stopped, {@code false} otherwise + * @throws InterruptedException if this thread is interrupted while waiting + */ + protected boolean waitStop() throws InterruptedException { + if (thread != null) { + Thread thread2 = thread; + thread = null; + + thread2.join(MAX_WAIT_MS); + + return !thread2.isAlive(); + } + + return true; + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/TimerManagerTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/TimerManagerTest.java new file mode 100644 index 00000000..3d5da908 --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/TimerManagerTest.java @@ -0,0 +1,400 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.pap.main.comm.TimerManager.Timer; + +public class TimerManagerTest extends Threaded { + private static final String EXPECTED_EXCEPTION = "expected exception"; + private static final String MGR_NAME = "my-manager"; + private static final String NAME1 = "timer-A"; + private static final String NAME2 = "timer-B"; + private static final String NAME3 = "timer-C"; + + private static final long MGR_TIMEOUT_MS = 10000; + + private MyManager mgr; + + /* + * This is a field to prevent checkstyle from complaining about the distance between + * its assignment and its use. + */ + private long tcur; + + /** + * Sets up. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + super.setUp(); + + mgr = new MyManager(MGR_NAME, MGR_TIMEOUT_MS); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + @Override + protected void stopThread() throws Exception { + if (mgr != null) { + mgr.stop(); + mgr.stopSleep(); + } + } + + @Test + public void testTimerManager_testStop() throws Exception { + startThread(mgr); + + mgr.stop(); + assertTrue(waitStop()); + + // ensure we can call "stop" a second time + mgr.stop(); + } + + @Test + public void testRegister() throws Exception { + mgr.register(NAME2, mgr::addToQueue); + mgr.register(NAME1, mgr::addToQueue); + + // goes to the end of the queue + mgr.register(NAME2, mgr::addToQueue); + + startThread(mgr); + + mgr.allowSleep(2); + + assertEquals(NAME1, mgr.awaitTimer()); + assertEquals(NAME2, mgr.awaitTimer()); + } + + @Test + public void testRun_Ex() throws Exception { + startThread(mgr); + mgr.register(NAME1, mgr::addToQueue); + + mgr.awaitSleep(); + + // background thread is "sleeping" - now we can interrupt it + interruptThread(); + + assertTrue(waitStop()); + } + + @Test + public void testProcessTimers() throws Exception { + startThread(mgr); + mgr.register(NAME1, mgr::addToQueue); + mgr.awaitSleep(); + mgr.allowSleep(1); + + mgr.register(NAME2, mgr::addToQueue); + mgr.awaitSleep(); + + // tell it to stop before returning from "sleep" + mgr.stop(); + mgr.allowSleep(1); + + assertTrue(waitStop()); + + assertEquals(NAME1, mgr.pollResult()); + assertNull(mgr.pollResult()); + } + + @Test + public void testGetNextTimer() throws Exception { + startThread(mgr); + mgr.register(NAME1, mgr::addToQueue); + mgr.awaitSleep(); + mgr.allowSleep(1); + + mgr.register(NAME2, mgr::addToQueue); + + mgr.awaitSleep(); + } + + @Test + public void testProcessTimer_StopWhileWaiting() throws Exception { + startThread(mgr); + mgr.register(NAME1, mgr::addToQueue); + mgr.awaitSleep(); + mgr.allowSleep(1); + + mgr.register(NAME2, mgr::addToQueue); + mgr.awaitSleep(); + + mgr.stop(); + mgr.allowSleep(1); + + assertTrue(waitStop()); + + // should have stopped after processing the first timer + assertEquals(NAME1, mgr.pollResult()); + assertNull(mgr.pollResult()); + } + + @Test + public void testProcessTimer_CancelWhileWaiting() throws Exception { + startThread(mgr); + Timer timer = mgr.register(NAME1, mgr::addToQueue); + mgr.awaitSleep(); + + timer.cancel(); + mgr.allowSleep(1); + + mgr.register(NAME2, mgr::addToQueue); + mgr.awaitSleep(); + mgr.allowSleep(1); + + mgr.register(NAME1, mgr::addToQueue); + mgr.awaitSleep(); + + // should have fired timer 2, but not timer 1 + assertEquals(NAME2, mgr.pollResult()); + assertNull(mgr.pollResult()); + } + + @Test + public void testProcessTimer_TimerEx() throws Exception { + startThread(mgr); + mgr.register(NAME1, name -> { + throw new RuntimeException(EXPECTED_EXCEPTION); + }); + mgr.register(NAME2, mgr::addToQueue); + mgr.awaitSleep(); + + mgr.allowSleep(2); + + mgr.register(NAME3, mgr::addToQueue); + mgr.awaitSleep(); + + // timer 1 fired but threw an exception, so only timer 2 should be in the queue + assertEquals(NAME2, mgr.pollResult()); + } + + @Test + public void testTimerAwait() throws Exception { + startThread(mgr); + + // same times - only need one sleep + mgr.register(NAME1, mgr::addToQueue); + mgr.register(NAME2, mgr::addToQueue); + mgr.awaitSleep(); + + tcur = mgr.currentTimeMillis(); + + mgr.allowSleep(1); + + // next one will have a new timeout, so expect to sleep + mgr.register(NAME3, mgr::addToQueue); + mgr.awaitSleep(); + + long tcur2 = mgr.currentTimeMillis(); + assertTrue(tcur2 >= tcur + MGR_TIMEOUT_MS); + + assertEquals(NAME1, mgr.pollResult()); + assertEquals(NAME2, mgr.pollResult()); + assertNull(mgr.pollResult()); + } + + @Test + public void testTimerCancel_WhileWaiting() throws Exception { + startThread(mgr); + + Timer timer = mgr.register(NAME1, mgr::addToQueue); + mgr.awaitSleep(); + + // cancel while sleeping + timer.cancel(); + + mgr.register(NAME2, mgr::addToQueue); + + // allow it to sleep through both timers + mgr.allowSleep(2); + + // only timer 2 should have fired + assertEquals(NAME2, mgr.timedPollResult()); + } + + @Test + public void testTimerCancel_ViaReplace() throws Exception { + startThread(mgr); + + mgr.register(NAME1, name -> mgr.addToQueue("hello")); + mgr.awaitSleep(); + + // replace the timer while the background thread is sleeping + mgr.register(NAME1, name -> mgr.addToQueue("world")); + + // allow it to sleep through both timers + mgr.allowSleep(2); + + // only timer 2 should have fired + assertEquals("world", mgr.timedPollResult()); + } + + @Test + public void testTimerToString() { + Timer timer = mgr.register(NAME1, mgr::addToQueue); + assertNotNull(timer.toString()); + } + + @Test + public void testCurrentTimeMillis() { + long tbeg = System.currentTimeMillis(); + long tcur = new TimerManager(MGR_NAME, MGR_TIMEOUT_MS).currentTimeMillis(); + long tend = System.currentTimeMillis(); + + assertTrue(tcur >= tbeg); + assertTrue(tend >= tcur); + } + + @Test + public void testSleep() throws Exception { + long tbeg = System.currentTimeMillis(); + new TimerManager(MGR_NAME, MGR_TIMEOUT_MS).sleep(10); + long tend = System.currentTimeMillis(); + + assertTrue(tend >= tbeg + 10); + } + + private static class MyManager extends TimerManager { + private AtomicLong curTime = new AtomicLong(1000); + private LinkedBlockingQueue<Boolean> sleepEntered = new LinkedBlockingQueue<>(); + private LinkedBlockingQueue<Boolean> shouldStop = new LinkedBlockingQueue<>(); + private LinkedBlockingQueue<String> results = new LinkedBlockingQueue<>(); + + public MyManager(String name, long waitTimeMs) { + super(name, waitTimeMs); + } + + /** + * Registers a timer. Also increments {@link #curTime} so that every time has a + * different expiration time, which prevents some issue with the junit tests. + */ + @Override + public Timer register(String timerName, Consumer<String> action) { + curTime.addAndGet(1); + return super.register(timerName, action); + } + + /** + * Stops the "sleep". + */ + public void stopSleep() { + shouldStop.add(true); + } + + /** + * Allows the manager to "sleep" several times. + * + * @param ntimes the number of times the manager should sleep + */ + public void allowSleep(int ntimes) { + for (int x = 0; x < ntimes; ++x) { + shouldStop.add(false); + } + } + + /** + * Waits for the manager to "sleep". + * + * @throws InterruptedException if the thread is interrupted while waiting for the + * background thread to sleep + */ + public void awaitSleep() throws InterruptedException { + if (sleepEntered.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS) == null) { + fail("background thread failed to sleep"); + } + } + + @Override + protected long currentTimeMillis() { + return curTime.get(); + } + + @Override + protected void sleep(long timeMs) throws InterruptedException { + sleepEntered.offer(true); + + if (!shouldStop.take()) { + // test thread did not request that we stop + curTime.addAndGet(timeMs); + } + } + + /** + * Waits for a timer to fire. + * + * @return the message the timer added to {@link #results} + * @throws InterruptedException if this thread is interrupted while waiting + */ + private String awaitTimer() throws InterruptedException { + return results.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS); + } + + /** + * Adds a name to the queue. + * + * @param name the name to add + */ + private void addToQueue(String name) { + results.add(name); + } + + /** + * Polls for a result. + * + * @return the next result, or {@code null} + */ + private String pollResult() { + return results.poll(); + } + + /** + * Polls for a result, waiting a limited amount of time. + * + * @return the next result, or {@code null} + * @throws InterruptedException if the thread is interrupted while waiting + */ + private String timedPollResult() throws InterruptedException { + return results.poll(MAX_WAIT_MS, TimeUnit.MILLISECONDS); + } + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/MessageDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/MessageDataTest.java new file mode 100644 index 00000000..68b02635 --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/MessageDataTest.java @@ -0,0 +1,92 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm.msgdata; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; + +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.models.pdp.concepts.PdpStateChange; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.pap.main.comm.TimerManager; + +public class MessageDataTest { + private static final int RETRIES = 1; + + private MyData data; + private TimerManager timers; + + /** + * Sets up. + */ + @Before + public void setUp() { + timers = mock(TimerManager.class); + + data = new MyData(); + } + + @Test + public void testGetMessage() { + assertNotNull(data.getMessage()); + } + + @Test + public void testGetType() { + assertEquals(PdpStateChange.class.getSimpleName(), data.getType()); + } + + @Test + public void testGetMaxRetryCount() { + assertEquals(RETRIES, data.getMaxRetryCount()); + } + + @Test + public void testGetTimers() { + assertSame(timers, data.getTimers()); + } + + private class MyData extends MessageData { + + public MyData() { + super(new PdpStateChange(), RETRIES, timers); + } + + @Override + public void mismatch(String reason) { + // do nothing + } + + @Override + public void completed() { + // do nothing + } + + @Override + public String checkResponse(PdpStatus response) { + // always succeed + return null; + } + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/StateChangeDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/StateChangeDataTest.java new file mode 100644 index 00000000..029775fa --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/StateChangeDataTest.java @@ -0,0 +1,122 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm.msgdata; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.models.pdp.concepts.PdpStateChange; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.enums.PdpState; +import org.onap.policy.pap.main.comm.TimerManager; +import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; +import org.onap.policy.pap.main.parameters.PdpParameters; +import org.onap.policy.pap.main.parameters.PdpStateChangeParameters; + +public class StateChangeDataTest { + private static final String MY_NAME = "my-name"; + private static final String DIFFERENT = "different"; + private static final PdpState MY_STATE = PdpState.SAFE; + private static final PdpState DIFF_STATE = PdpState.TERMINATED; + private static final int RETRIES = 1; + + private MyData data; + private PdpModifyRequestMapParams params; + private TimerManager timers; + private PdpStatus response; + + /** + * Sets up. + */ + @Before + public void setUp() { + timers = mock(TimerManager.class); + response = new PdpStatus(); + PdpParameters pdpParams = mock(PdpParameters.class); + PdpStateChangeParameters stateParams = mock(PdpStateChangeParameters.class); + + when(stateParams.getMaxRetryCount()).thenReturn(RETRIES); + when(pdpParams.getStateChangeParameters()).thenReturn(stateParams); + + params = new PdpModifyRequestMapParams().setParams(pdpParams).setStateChangeTimers(timers); + + response.setName(MY_NAME); + response.setState(MY_STATE); + + data = new MyData(); + } + + @Test + public void testGetMaxRetryCount() { + assertEquals(RETRIES, data.getMaxRetryCount()); + } + + @Test + public void testGetTimers() { + assertSame(timers, data.getTimers()); + } + + @Test + public void testStateChangeCheckResponse() { + assertNull(data.checkResponse(response)); + } + + @Test + public void testStateChangeCheckResponse_MismatchedName() { + response.setName(DIFFERENT); + + assertEquals("name does not match", data.checkResponse(response)); + } + + @Test + public void testStateChangeCheckResponse_MismatchedState() { + response.setState(DIFF_STATE); + + assertEquals("state is TERMINATED, but expected SAFE", data.checkResponse(response)); + } + + private class MyData extends StateChangeData { + + public MyData() { + super(new PdpStateChange(), params); + + PdpStateChange msg = (PdpStateChange) getMessage(); + + msg.setName(MY_NAME); + msg.setState(MY_STATE); + } + + @Override + public void mismatch(String reason) { + // do nothing + } + + @Override + public void completed() { + // do nothing + } + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/UpdateDataTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/UpdateDataTest.java new file mode 100644 index 00000000..8676c95e --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/msgdata/UpdateDataTest.java @@ -0,0 +1,169 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm.msgdata; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.models.base.PfConceptKey; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpUpdate; +import org.onap.policy.models.tosca.simple.concepts.ToscaPolicy; +import org.onap.policy.pap.main.comm.TimerManager; +import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams; +import org.onap.policy.pap.main.parameters.PdpParameters; +import org.onap.policy.pap.main.parameters.PdpUpdateParameters; + +public class UpdateDataTest { + private static final String MY_NAME = "my-name"; + private static final String DIFFERENT = "different"; + private static final int RETRIES = 1; + + private MyData data; + private PdpModifyRequestMapParams params; + private TimerManager timers; + private PdpUpdate update; + private PdpStatus response; + + /** + * Sets up. + */ + @Before + public void setUp() { + timers = mock(TimerManager.class); + response = new PdpStatus(); + PdpParameters pdpParams = mock(PdpParameters.class); + PdpUpdateParameters stateParams = mock(PdpUpdateParameters.class); + + when(stateParams.getMaxRetryCount()).thenReturn(RETRIES); + when(pdpParams.getUpdateParameters()).thenReturn(stateParams); + + params = new PdpModifyRequestMapParams().setParams(pdpParams).setUpdateTimers(timers); + + update = makeUpdate(); + + response.setName(MY_NAME); + response.setPdpGroup(update.getPdpGroup()); + response.setPdpSubgroup(update.getPdpSubgroup()); + response.setPolicies(update.getPolicies()); + + data = new MyData(update); + } + + @Test + public void testGetMaxRetryCount() { + assertEquals(RETRIES, data.getMaxRetryCount()); + } + + @Test + public void testGetTimers() { + assertSame(timers, data.getTimers()); + } + + @Test + public void testUpdateCheckResponse() { + assertNull(data.checkResponse(response)); + } + + @Test + public void testUpdateDataCheckResponse_MismatchedName() { + response.setName(DIFFERENT); + + assertEquals("name does not match", data.checkResponse(response)); + } + + @Test + public void testUpdateDataCheckResponse_MismatchedGroup() { + response.setPdpGroup(DIFFERENT); + + assertEquals("group does not match", data.checkResponse(response)); + } + + @Test + public void testUpdateDataCheckResponse_MismatchedSubGroup() { + response.setPdpSubgroup(DIFFERENT); + + assertEquals("subgroup does not match", data.checkResponse(response)); + } + + @Test + public void testUpdateDataCheckResponse_MismatchedPoliciesLength() { + response.setPolicies(Arrays.asList(update.getPolicies().get(0))); + + assertEquals("policies do not match", data.checkResponse(response)); + } + + @Test + public void testUpdateDataCheckResponse_MismatchedPolicies() { + ArrayList<ToscaPolicy> policies = new ArrayList<>(update.getPolicies()); + policies.set(0, new ToscaPolicy(new PfConceptKey(DIFFERENT, "10.0.0"))); + + response.setPolicies(policies); + + assertEquals("policies do not match", data.checkResponse(response)); + } + + /** + * Makes an update message. + * + * @return a new update message + */ + private PdpUpdate makeUpdate() { + PdpUpdate upd = new PdpUpdate(); + + upd.setDescription("update-description"); + upd.setName(MY_NAME); + upd.setPdpGroup("group1-a"); + upd.setPdpSubgroup("sub1-a"); + upd.setPdpType("drools"); + + ToscaPolicy policy1 = new ToscaPolicy(new PfConceptKey("policy-1-a", "1.0.0")); + ToscaPolicy policy2 = new ToscaPolicy(new PfConceptKey("policy-2-a", "1.1.0")); + + upd.setPolicies(Arrays.asList(policy1, policy2)); + + return upd; + } + + private class MyData extends UpdateData { + + public MyData(PdpUpdate message) { + super(message, params); + } + + @Override + public void mismatch(String reason) { + // do nothing + } + + @Override + public void completed() { + // do nothing + } + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/parameters/TestPdpModifyRequestMapParams.java b/main/src/test/java/org/onap/policy/pap/main/parameters/TestPdpModifyRequestMapParams.java new file mode 100644 index 00000000..3e691899 --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/parameters/TestPdpModifyRequestMapParams.java @@ -0,0 +1,110 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.parameters; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; + +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.pap.main.comm.Publisher; +import org.onap.policy.pap.main.comm.TimerManager; + +public class TestPdpModifyRequestMapParams { + private PdpModifyRequestMapParams params; + private Publisher pub; + private RequestIdDispatcher<PdpStatus> disp; + private Object lock; + private PdpParameters pdpParams; + private TimerManager updTimers; + private TimerManager stateTimers; + + /** + * Sets up the objects and creates an empty {@link #params}. + */ + @Before + @SuppressWarnings("unchecked") + public void setUp() { + pub = mock(Publisher.class); + disp = mock(RequestIdDispatcher.class); + lock = new Object(); + pdpParams = mock(PdpParameters.class); + updTimers = mock(TimerManager.class); + stateTimers = mock(TimerManager.class); + + params = new PdpModifyRequestMapParams().setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp); + } + + @Test + public void testGettersSetters() { + assertSame(params, params.setParams(pdpParams).setStateChangeTimers(stateTimers).setUpdateTimers(updTimers)); + + assertSame(pdpParams, params.getParams()); + assertSame(updTimers, params.getUpdateTimers()); + assertSame(stateTimers, params.getStateChangeTimers()); + + // super class data should also be available + assertSame(pub, params.getPublisher()); + assertSame(disp, params.getResponseDispatcher()); + assertSame(lock, params.getModifyLock()); + } + + @Test + public void testValidate() { + // no exception + params.setParams(pdpParams).setStateChangeTimers(stateTimers).setUpdateTimers(updTimers).validate(); + } + + @Test + public void testValidate_MissingPdpParams() { + assertThatIllegalArgumentException().isThrownBy( + () -> params.setStateChangeTimers(stateTimers).setUpdateTimers(updTimers).validate()) + .withMessageContaining("PDP param"); + } + + @Test + public void testValidate_MissingStateChangeTimers() { + assertThatIllegalArgumentException().isThrownBy( + () -> params.setParams(pdpParams).setUpdateTimers(updTimers).validate()) + .withMessageContaining("state"); + } + + @Test + public void testValidate_MissingUpdateTimers() { + assertThatIllegalArgumentException().isThrownBy( + () -> params.setParams(pdpParams).setStateChangeTimers(stateTimers).validate()) + .withMessageContaining("update"); + } + + @Test + public void testValidate_MissingSuperclassData() { + // leave out one of the superclass fields + assertThatIllegalArgumentException().isThrownBy( + () -> new PdpModifyRequestMapParams() + .setPublisher(pub) + .setResponseDispatcher(disp).setParams(pdpParams).setStateChangeTimers(stateTimers) + .setUpdateTimers(updTimers).validate()).withMessageContaining("Lock"); + + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/parameters/TestRequestDataParams.java b/main/src/test/java/org/onap/policy/pap/main/parameters/TestRequestDataParams.java new file mode 100644 index 00000000..16d247f2 --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/parameters/TestRequestDataParams.java @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.parameters; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; + +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.pap.main.comm.Publisher; + +public class TestRequestDataParams { + private RequestDataParams params; + private Publisher pub; + private RequestIdDispatcher<PdpStatus> disp; + private Object lock; + + /** + * Sets up the objects and creates an empty {@link #params}. + */ + @Before + @SuppressWarnings("unchecked") + public void setUp() { + pub = mock(Publisher.class); + disp = mock(RequestIdDispatcher.class); + lock = new Object(); + + params = new RequestDataParams(); + } + + @Test + public void testGettersSetters() { + assertSame(params, params.setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp)); + + assertSame(pub, params.getPublisher()); + assertSame(disp, params.getResponseDispatcher()); + assertSame(lock, params.getModifyLock()); + } + + @Test + public void testValidate() { + // no exception + params.setModifyLock(lock).setPublisher(pub).setResponseDispatcher(disp).validate(); + } + + @Test + public void testValidate_MissingLock() { + assertThatIllegalArgumentException().isThrownBy( + () -> params.setPublisher(pub).setResponseDispatcher(disp).validate()) + .withMessageContaining("Lock"); + } + + @Test + public void testValidate_MissingDispatcher() { + assertThatIllegalArgumentException().isThrownBy( + () -> params.setModifyLock(lock).setPublisher(pub).validate()) + .withMessageContaining("Dispatcher"); + } + + @Test + public void testValidate_MissingPublisher() { + assertThatIllegalArgumentException().isThrownBy( + () -> params.setModifyLock(lock).setResponseDispatcher(disp).validate()) + .withMessageContaining("publisher"); + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/startstop/TestPapActivator.java b/main/src/test/java/org/onap/policy/pap/main/startstop/TestPapActivator.java index cfa2ae92..6c9e092e 100644 --- a/main/src/test/java/org/onap/policy/pap/main/startstop/TestPapActivator.java +++ b/main/src/test/java/org/onap/policy/pap/main/startstop/TestPapActivator.java @@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThatIllegalStateException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.FileInputStream; @@ -35,6 +36,7 @@ import org.junit.Test; import org.onap.policy.common.utils.services.Registry; import org.onap.policy.pap.main.PapConstants; import org.onap.policy.pap.main.PolicyPapException; +import org.onap.policy.pap.main.comm.PdpModifyRequestMap; import org.onap.policy.pap.main.parameters.CommonTestData; import org.onap.policy.pap.main.parameters.PapParameterGroup; import org.onap.policy.pap.main.parameters.PapParameterHandler; @@ -75,6 +77,7 @@ public class TestPapActivator { /** * Method for cleanup after each test. + * * @throws Exception if an error occurs */ @After @@ -95,6 +98,7 @@ public class TestPapActivator { // ensure items were added to the registry assertNotNull(Registry.get(PapConstants.REG_PDP_MODIFY_LOCK, Object.class)); assertNotNull(Registry.get(PapConstants.REG_STATISTICS_MANAGER, PapStatisticsManager.class)); + assertNotNull(Registry.get(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class)); // repeat - should throw an exception assertThatIllegalStateException().isThrownBy(() -> activator.start()); @@ -108,6 +112,11 @@ public class TestPapActivator { activator.stop(); assertFalse(activator.isAlive()); + // ensure items have been removed from the registry + assertNull(Registry.getOrDefault(PapConstants.REG_PDP_MODIFY_LOCK, Object.class, null)); + assertNull(Registry.getOrDefault(PapConstants.REG_STATISTICS_MANAGER, PapStatisticsManager.class, null)); + assertNull(Registry.getOrDefault(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class, null)); + // repeat - should throw an exception assertThatIllegalStateException().isThrownBy(() -> activator.stop()); assertFalse(activator.isAlive()); |