diff options
Diffstat (limited to 'models-interactions/model-actors/actorServiceProvider/src')
8 files changed, 93 insertions, 37 deletions
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java index b885b5c25..ba4785922 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; */ public class Util { private static final Logger logger = LoggerFactory.getLogger(Util.class); + private static final Coder coder = new StandardCoder(); private Util() { // do nothing @@ -84,11 +85,8 @@ public class Util { * @return the translated object */ public static <T> T translate(String identifier, Object source, Class<T> clazz) { - Coder coder = new StandardCoder(); - try { - String json = coder.encode(source); - return coder.decode(json, clazz); + return coder.convert(source, clazz); } catch (CoderException | RuntimeException e) { throw new IllegalArgumentException("cannot translate parameters for " + identifier, e); @@ -105,10 +103,6 @@ public class Util { */ @SuppressWarnings("unchecked") public static Map<String, Object> translateToMap(String identifier, Object source) { - if (source == null) { - return null; - } - return translate(identifier, source, LinkedHashMap.class); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java index 1c37a8e0d..3e02da611 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java @@ -130,15 +130,27 @@ public class ControlLoopEventContext implements Serializable { return null; } - CompletableFuture<OperationOutcome> future = retrievers.get(name); - if (future != null) { - return future; - } + /* + * Return any existing future, if it wasn't canceled. Otherwise, start a new + * request. + */ - future = params.start(); + // @formatter:off + CompletableFuture<OperationOutcome> oldFuture = + retrievers.compute(name, (key, future) -> (future == null || future.isCancelled() ? null : future)); + // @formatter:on - CompletableFuture<OperationOutcome> oldFuture = retrievers.putIfAbsent(name, future); if (oldFuture != null) { + return oldFuture; + } + + /* + * Note: must NOT invoke params.start() within retrievers.compute(), as start() + * may invoke obtain() which would cause a recursive update to the retrievers map. + */ + CompletableFuture<OperationOutcome> future = params.start(); + + if ((oldFuture = retrievers.putIfAbsent(name, future)) != null) { future.cancel(false); return oldFuture; } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java index f82015d6b..d1e21f8fd 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java @@ -214,14 +214,14 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial case SUCCESS: logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId()); - setOutcome(outcome, PolicyResult.SUCCESS); + setOutcome(outcome, PolicyResult.SUCCESS, response); postProcessResponse(outcome, rawResponse, response); return outcome; case FAILURE: logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(), params.getRequestId()); - return setOutcome(outcome, PolicyResult.FAILURE); + return setOutcome(outcome, PolicyResult.FAILURE, response); case STILL_WAITING: default: @@ -232,6 +232,18 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial } /** + * Sets an operation's outcome and default message based on the result. + * + * @param outcome operation to be updated + * @param result result of the operation + * @param response response used to populate the outcome + * @return the updated operation + */ + public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, S response) { + return setOutcome(outcome, result); + } + + /** * Processes a successful response. * * @param outcome outcome to be populated diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java index f1829d79a..c3c0f6dc2 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java @@ -148,21 +148,23 @@ public abstract class HttpOperation<T> extends OperationPartial { controller.add(requester.apply(callback)); // once "future" completes, process the response, and then complete the controller - future.thenApplyAsync(response -> processResponse(outcome, url, response), executor) + future.thenComposeAsync(response -> processResponse(outcome, url, response), executor) .whenCompleteAsync(controller.delayedComplete(), executor); return controller; } /** - * Processes a response. This method simply sets the outcome to SUCCESS. + * Processes a response. This method decodes the response, sets the outcome based on + * the response, and then returns a completed future. * * @param outcome outcome to be populate * @param url URL to which to request was sent * @param response raw response to process - * @return the outcome + * @return a future to cancel or await the outcome */ - protected OperationOutcome processResponse(OperationOutcome outcome, String url, Response rawResponse) { + protected CompletableFuture<OperationOutcome> processResponse(OperationOutcome outcome, String url, + Response rawResponse) { logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId()); @@ -173,7 +175,6 @@ public abstract class HttpOperation<T> extends OperationPartial { T response; if (responseClass == String.class) { response = responseClass.cast(strResponse); - } else { try { response = makeCoder().decode(strResponse, responseClass); @@ -187,26 +188,40 @@ public abstract class HttpOperation<T> extends OperationPartial { if (!isSuccess(rawResponse, response)) { logger.info("{}.{} request failed with http error code {} for {}", params.getActor(), params.getOperation(), rawResponse.getStatus(), params.getRequestId()); - return setOutcome(outcome, PolicyResult.FAILURE); + return CompletableFuture.completedFuture(setOutcome(outcome, PolicyResult.FAILURE, response)); } logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId()); - setOutcome(outcome, PolicyResult.SUCCESS); - postProcessResponse(outcome, url, rawResponse, response); + setOutcome(outcome, PolicyResult.SUCCESS, response); + return postProcessResponse(outcome, url, rawResponse, response); + } - return outcome; + /** + * Sets an operation's outcome and default message based on the result. + * + * @param outcome operation to be updated + * @param result result of the operation + * @param response response used to populate the outcome + * @return the updated operation + */ + public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, T response) { + return setOutcome(outcome, result); } /** - * Processes a successful response. + * Processes a successful response. This method simply returns the outcome wrapped in + * a completed future. * * @param outcome outcome to be populate * @param url URL to which to request was sent * @param rawResponse raw response * @param response decoded response + * @return a future to cancel or await the outcome */ - protected void postProcessResponse(OperationOutcome outcome, String url, Response rawResponse, T response) { - // do nothing + protected CompletableFuture<OperationOutcome> postProcessResponse(OperationOutcome outcome, String url, + Response rawResponse, T response) { + + return CompletableFuture.completedFuture(outcome); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java index 0b3497197..680a56f89 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java @@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory; public abstract class OperationPartial implements Operation { private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class); private static final Coder coder = new StandardCoder(); + public static final long DEFAULT_RETRY_WAIT_MS = 1000L; // values extracted from the operator diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java index fcb463518..93beab1cb 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; */ public class TopicListenerImpl implements TopicListener { private static final Logger logger = LoggerFactory.getLogger(TopicListenerImpl.class); - private static StandardCoder coder = new StandardCoder(); + private static final StandardCoder coder = new StandardCoder(); /** * Maps selector to a forwarder. diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java index b462043d5..cf2426214 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java @@ -42,6 +42,7 @@ import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOp public class ControlLoopEventContextTest { private static final UUID REQ_ID = UUID.randomUUID(); + private static final String ITEM_KEY = "obtain-C"; private Map<String, String> enrichment; private VirtualControlLoopEvent event; @@ -118,13 +119,28 @@ public class ControlLoopEventContextTest { ControlLoopOperationParams params2 = mock(ControlLoopOperationParams.class); when(params2.start()).thenReturn(future2); - assertSame(future2, context.obtain("obtain-C", params2)); + assertSame(future2, context.obtain(ITEM_KEY, params2)); return future; }); - assertSame(future2, context.obtain("obtain-C", params)); + assertSame(future2, context.obtain(ITEM_KEY, params)); // should have canceled the interrupted future assertTrue(future.isCancelled()); + + // return a new future next time start() is called + CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>(); + when(params.start()).thenReturn(future3); + + // repeat - should get the same future + assertSame(future2, context.obtain(ITEM_KEY, params)); + assertSame(future2, context.obtain(ITEM_KEY, params)); + + // future2 should still be active + assertFalse(future2.isCancelled()); + + // cancel it - now we should get the new future + future2.cancel(false); + assertSame(future3, context.obtain(ITEM_KEY, params)); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java index 50cb8fa8f..8189c74fe 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java @@ -302,8 +302,10 @@ public class HttpOperationTest { * Tests processResponse() when it's a success and the response type is a String. */ @Test - public void testProcessResponseSuccessString() { - assertSame(outcome, oper.processResponse(outcome, PATH, response)); + public void testProcessResponseSuccessString() throws Exception { + CompletableFuture<OperationOutcome> result = oper.processResponse(outcome, PATH, response); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); assertEquals(PolicyResult.SUCCESS, outcome.getResult()); } @@ -311,9 +313,11 @@ public class HttpOperationTest { * Tests processResponse() when it's a failure. */ @Test - public void testProcessResponseFailure() { + public void testProcessResponseFailure() throws Exception { when(response.getStatus()).thenReturn(555); - assertSame(outcome, oper.processResponse(outcome, PATH, response)); + CompletableFuture<OperationOutcome> result = oper.processResponse(outcome, PATH, response); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); assertEquals(PolicyResult.FAILURE, outcome.getResult()); } @@ -321,12 +325,14 @@ public class HttpOperationTest { * Tests processResponse() when the decoder succeeds. */ @Test - public void testProcessResponseDecodeOk() throws CoderException { + public void testProcessResponseDecodeOk() throws Exception { when(response.readEntity(String.class)).thenReturn("10"); MyGetOperation<Integer> oper2 = new MyGetOperation<>(Integer.class); - assertSame(outcome, oper2.processResponse(outcome, PATH, response)); + CompletableFuture<OperationOutcome> result = oper2.processResponse(outcome, PATH, response); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); assertEquals(PolicyResult.SUCCESS, outcome.getResult()); } |