From e44f1a1c58efed9fbe2efce78864aaee3b577003 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Mon, 17 Feb 2020 13:57:56 -0500 Subject: More changes to actor code Use Coder.convert() from policy-common. Passed response to setOutcome(). Changed class names from XxxOperator to XxxOperation. Modified SDNC junits to invoke start() instead of startOperationAsync(). Changed context obtain() to re-run if the future was canceled. Added junit support class, BasicBidirectionalTopicOperation. Modified HttpOperation to allow subsequent requests to be issued. Some actors, like SO, send an initial HTTP request and then follow it with HTTP "are you done?" requests. Issue-ID: POLICY-2363-prop Change-Id: I12b5c2d4f07254e0cb79fabfe1ccf844b70a0654 Signed-off-by: Jim Hahn --- .../controlloop/actorserviceprovider/Util.java | 10 ++---- .../controlloop/ControlLoopEventContext.java | 24 +++++++++---- .../impl/BidirectionalTopicOperation.java | 16 +++++++-- .../actorserviceprovider/impl/HttpOperation.java | 39 +++++++++++++++------- .../impl/OperationPartial.java | 1 + .../topic/TopicListenerImpl.java | 2 +- .../controlloop/ControlLoopEventContextTest.java | 20 +++++++++-- .../impl/HttpOperationTest.java | 18 ++++++---- 8 files changed, 93 insertions(+), 37 deletions(-) (limited to 'models-interactions/model-actors/actorServiceProvider') diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java index b885b5c25..ba4785922 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/Util.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; */ public class Util { private static final Logger logger = LoggerFactory.getLogger(Util.class); + private static final Coder coder = new StandardCoder(); private Util() { // do nothing @@ -84,11 +85,8 @@ public class Util { * @return the translated object */ public static T translate(String identifier, Object source, Class clazz) { - Coder coder = new StandardCoder(); - try { - String json = coder.encode(source); - return coder.decode(json, clazz); + return coder.convert(source, clazz); } catch (CoderException | RuntimeException e) { throw new IllegalArgumentException("cannot translate parameters for " + identifier, e); @@ -105,10 +103,6 @@ public class Util { */ @SuppressWarnings("unchecked") public static Map translateToMap(String identifier, Object source) { - if (source == null) { - return null; - } - return translate(identifier, source, LinkedHashMap.class); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java index 1c37a8e0d..3e02da611 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContext.java @@ -130,15 +130,27 @@ public class ControlLoopEventContext implements Serializable { return null; } - CompletableFuture future = retrievers.get(name); - if (future != null) { - return future; - } + /* + * Return any existing future, if it wasn't canceled. Otherwise, start a new + * request. + */ - future = params.start(); + // @formatter:off + CompletableFuture oldFuture = + retrievers.compute(name, (key, future) -> (future == null || future.isCancelled() ? null : future)); + // @formatter:on - CompletableFuture oldFuture = retrievers.putIfAbsent(name, future); if (oldFuture != null) { + return oldFuture; + } + + /* + * Note: must NOT invoke params.start() within retrievers.compute(), as start() + * may invoke obtain() which would cause a recursive update to the retrievers map. + */ + CompletableFuture future = params.start(); + + if ((oldFuture = retrievers.putIfAbsent(name, future)) != null) { future.cancel(false); return oldFuture; } diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java index f82015d6b..d1e21f8fd 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/BidirectionalTopicOperation.java @@ -214,14 +214,14 @@ public abstract class BidirectionalTopicOperation extends OperationPartial case SUCCESS: logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId()); - setOutcome(outcome, PolicyResult.SUCCESS); + setOutcome(outcome, PolicyResult.SUCCESS, response); postProcessResponse(outcome, rawResponse, response); return outcome; case FAILURE: logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(), params.getRequestId()); - return setOutcome(outcome, PolicyResult.FAILURE); + return setOutcome(outcome, PolicyResult.FAILURE, response); case STILL_WAITING: default: @@ -231,6 +231,18 @@ public abstract class BidirectionalTopicOperation extends OperationPartial } } + /** + * Sets an operation's outcome and default message based on the result. + * + * @param outcome operation to be updated + * @param result result of the operation + * @param response response used to populate the outcome + * @return the updated operation + */ + public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, S response) { + return setOutcome(outcome, result); + } + /** * Processes a successful response. * diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java index f1829d79a..c3c0f6dc2 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperation.java @@ -148,21 +148,23 @@ public abstract class HttpOperation extends OperationPartial { controller.add(requester.apply(callback)); // once "future" completes, process the response, and then complete the controller - future.thenApplyAsync(response -> processResponse(outcome, url, response), executor) + future.thenComposeAsync(response -> processResponse(outcome, url, response), executor) .whenCompleteAsync(controller.delayedComplete(), executor); return controller; } /** - * Processes a response. This method simply sets the outcome to SUCCESS. + * Processes a response. This method decodes the response, sets the outcome based on + * the response, and then returns a completed future. * * @param outcome outcome to be populate * @param url URL to which to request was sent * @param response raw response to process - * @return the outcome + * @return a future to cancel or await the outcome */ - protected OperationOutcome processResponse(OperationOutcome outcome, String url, Response rawResponse) { + protected CompletableFuture processResponse(OperationOutcome outcome, String url, + Response rawResponse) { logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId()); @@ -173,7 +175,6 @@ public abstract class HttpOperation extends OperationPartial { T response; if (responseClass == String.class) { response = responseClass.cast(strResponse); - } else { try { response = makeCoder().decode(strResponse, responseClass); @@ -187,26 +188,40 @@ public abstract class HttpOperation extends OperationPartial { if (!isSuccess(rawResponse, response)) { logger.info("{}.{} request failed with http error code {} for {}", params.getActor(), params.getOperation(), rawResponse.getStatus(), params.getRequestId()); - return setOutcome(outcome, PolicyResult.FAILURE); + return CompletableFuture.completedFuture(setOutcome(outcome, PolicyResult.FAILURE, response)); } logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId()); - setOutcome(outcome, PolicyResult.SUCCESS); - postProcessResponse(outcome, url, rawResponse, response); + setOutcome(outcome, PolicyResult.SUCCESS, response); + return postProcessResponse(outcome, url, rawResponse, response); + } - return outcome; + /** + * Sets an operation's outcome and default message based on the result. + * + * @param outcome operation to be updated + * @param result result of the operation + * @param response response used to populate the outcome + * @return the updated operation + */ + public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, T response) { + return setOutcome(outcome, result); } /** - * Processes a successful response. + * Processes a successful response. This method simply returns the outcome wrapped in + * a completed future. * * @param outcome outcome to be populate * @param url URL to which to request was sent * @param rawResponse raw response * @param response decoded response + * @return a future to cancel or await the outcome */ - protected void postProcessResponse(OperationOutcome outcome, String url, Response rawResponse, T response) { - // do nothing + protected CompletableFuture postProcessResponse(OperationOutcome outcome, String url, + Response rawResponse, T response) { + + return CompletableFuture.completedFuture(outcome); } /** diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java index 0b3497197..680a56f89 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java @@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory; public abstract class OperationPartial implements Operation { private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class); private static final Coder coder = new StandardCoder(); + public static final long DEFAULT_RETRY_WAIT_MS = 1000L; // values extracted from the operator diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java index fcb463518..93beab1cb 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/topic/TopicListenerImpl.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; */ public class TopicListenerImpl implements TopicListener { private static final Logger logger = LoggerFactory.getLogger(TopicListenerImpl.class); - private static StandardCoder coder = new StandardCoder(); + private static final StandardCoder coder = new StandardCoder(); /** * Maps selector to a forwarder. diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java index b462043d5..cf2426214 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/controlloop/ControlLoopEventContextTest.java @@ -42,6 +42,7 @@ import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOp public class ControlLoopEventContextTest { private static final UUID REQ_ID = UUID.randomUUID(); + private static final String ITEM_KEY = "obtain-C"; private Map enrichment; private VirtualControlLoopEvent event; @@ -118,13 +119,28 @@ public class ControlLoopEventContextTest { ControlLoopOperationParams params2 = mock(ControlLoopOperationParams.class); when(params2.start()).thenReturn(future2); - assertSame(future2, context.obtain("obtain-C", params2)); + assertSame(future2, context.obtain(ITEM_KEY, params2)); return future; }); - assertSame(future2, context.obtain("obtain-C", params)); + assertSame(future2, context.obtain(ITEM_KEY, params)); // should have canceled the interrupted future assertTrue(future.isCancelled()); + + // return a new future next time start() is called + CompletableFuture future3 = new CompletableFuture<>(); + when(params.start()).thenReturn(future3); + + // repeat - should get the same future + assertSame(future2, context.obtain(ITEM_KEY, params)); + assertSame(future2, context.obtain(ITEM_KEY, params)); + + // future2 should still be active + assertFalse(future2.isCancelled()); + + // cancel it - now we should get the new future + future2.cancel(false); + assertSame(future3, context.obtain(ITEM_KEY, params)); } } diff --git a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java index 50cb8fa8f..8189c74fe 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java +++ b/models-interactions/model-actors/actorServiceProvider/src/test/java/org/onap/policy/controlloop/actorserviceprovider/impl/HttpOperationTest.java @@ -302,8 +302,10 @@ public class HttpOperationTest { * Tests processResponse() when it's a success and the response type is a String. */ @Test - public void testProcessResponseSuccessString() { - assertSame(outcome, oper.processResponse(outcome, PATH, response)); + public void testProcessResponseSuccessString() throws Exception { + CompletableFuture result = oper.processResponse(outcome, PATH, response); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); assertEquals(PolicyResult.SUCCESS, outcome.getResult()); } @@ -311,9 +313,11 @@ public class HttpOperationTest { * Tests processResponse() when it's a failure. */ @Test - public void testProcessResponseFailure() { + public void testProcessResponseFailure() throws Exception { when(response.getStatus()).thenReturn(555); - assertSame(outcome, oper.processResponse(outcome, PATH, response)); + CompletableFuture result = oper.processResponse(outcome, PATH, response); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); assertEquals(PolicyResult.FAILURE, outcome.getResult()); } @@ -321,12 +325,14 @@ public class HttpOperationTest { * Tests processResponse() when the decoder succeeds. */ @Test - public void testProcessResponseDecodeOk() throws CoderException { + public void testProcessResponseDecodeOk() throws Exception { when(response.readEntity(String.class)).thenReturn("10"); MyGetOperation oper2 = new MyGetOperation<>(Integer.class); - assertSame(outcome, oper2.processResponse(outcome, PATH, response)); + CompletableFuture result = oper2.processResponse(outcome, PATH, response); + assertTrue(result.isDone()); + assertSame(outcome, result.get()); assertEquals(PolicyResult.SUCCESS, outcome.getResult()); } -- cgit 1.2.3-korg