diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java')
-rw-r--r-- | plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java | 40 |
1 files changed, 32 insertions, 8 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java index 776440232..db4eacb4b 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,11 +27,15 @@ import java.util.Arrays; import java.util.EnumMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Optional; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; @@ -98,6 +103,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // The number of the next request runner thread private static long nextRequestRunnerThreadNo = 0; + private String untaggedUrl = null; + @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, final ApexEventReceiver incomingEventReceiver) throws ApexEventException { @@ -240,6 +247,23 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { continue; } + Properties inputExecutionProperties = restRequest.getExecutionProperties(); + untaggedUrl = restConsumerProperties.getUrl(); + if (inputExecutionProperties != null) { + Set<String> names = restConsumerProperties.getKeysFromUrl(); + Set<String> inputProperty = inputExecutionProperties.stringPropertyNames(); + + names.stream().map(key -> Optional.of(key)).forEach(op -> { + op.filter(str -> inputProperty.contains(str)) + .orElseThrow(() -> new ApexEventRuntimeException( + "key\"" + op.get() + "\"specified on url \"" + restConsumerProperties.getUrl() + + "\"not found in execution properties passed by the current policy")); + }); + + untaggedUrl = names.stream().reduce(untaggedUrl, + (acc, str) -> acc.replace("{" + str + "}", (String) inputExecutionProperties.get(str))); + } + // Set the time stamp of the REST request restRequest.setTimestamp(System.currentTimeMillis()); @@ -331,12 +355,12 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { try { // Execute the REST request - final Response response = sendEventAsRestRequest(); + final Response response = sendEventAsRestRequest(untaggedUrl); // Check that the event request worked if (!Response.Status.Family.familyOf(response.getStatus()).equals(Response.Status.Family.SUCCESSFUL)) { final String errorMessage = "reception of response to \"" + request + "\" from URL \"" - + restConsumerProperties.getUrl() + "\" failed with status code " + + untaggedUrl + "\" failed with status code " + response.getStatus() + " and message \"" + response.readEntity(String.class) + "\""; throw new ApexEventRuntimeException(errorMessage); @@ -348,7 +372,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // Check there is content if (eventJsonString == null || eventJsonString.trim().length() == 0) { final String errorMessage = "received an enpty response to \"" + request + "\" from URL \"" - + restConsumerProperties.getUrl() + "\""; + + untaggedUrl + "\""; throw new ApexEventRuntimeException(errorMessage); } @@ -379,24 +403,24 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { * * @return the response to the REST request */ - public Response sendEventAsRestRequest() { + public Response sendEventAsRestRequest(String untaggedUrl) { switch (restConsumerProperties.getHttpMethod()) { case GET: - return client.target(restConsumerProperties.getUrl()).request(APPLICATION_JSON) + return client.target(untaggedUrl).request(APPLICATION_JSON) .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()).get(); case PUT: - return client.target(restConsumerProperties.getUrl()).request(APPLICATION_JSON) + return client.target(untaggedUrl).request(APPLICATION_JSON) .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()) .put(Entity.json(request.getEvent())); case POST: - return client.target(restConsumerProperties.getUrl()).request(APPLICATION_JSON) + return client.target(untaggedUrl).request(APPLICATION_JSON) .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()) .post(Entity.json(request.getEvent())); case DELETE: - return client.target(restConsumerProperties.getUrl()).request(APPLICATION_JSON) + return client.target(untaggedUrl).request(APPLICATION_JSON) .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()).delete(); default: |