diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java')
-rw-r--r-- | plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java | 87 |
1 files changed, 50 insertions, 37 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java index 82d16273f..f60abc24c 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java @@ -36,11 +36,15 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation.Builder; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; import org.onap.policy.apex.service.engine.event.ApexEventConsumer; @@ -67,6 +71,9 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // stopped private static final long REST_REQUESTOR_WAIT_SLEEP_TIME = 50; + // The Key for property + private static final String HTTP_CODE_STATUS = "HTTP_CODE_STATUS"; + // The REST parameters read from the parameter service private RestRequestorCarrierTechnologyParameters restConsumerProperties; @@ -104,27 +111,30 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { private String untaggedUrl = null; + // The pattern for filtering status code + private Pattern httpCodeFilterPattern = null; + @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, - final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { this.eventReceiver = incomingEventReceiver; this.name = consumerName; // Check and get the REST Properties if (!(consumerParameters - .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) { - final String errorMessage = "specified consumer properties are not applicable to REST Requestor consumer (" - + this.name + ")"; + .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) { + final String errorMessage = + "specified consumer properties are not applicable to REST Requestor consumer (" + this.name + ")"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } - restConsumerProperties = (RestRequestorCarrierTechnologyParameters) consumerParameters - .getCarrierTechnologyParameters(); + restConsumerProperties = + (RestRequestorCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); // Check if we are in peered mode if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { final String errorMessage = "REST Requestor consumer (" + this.name - + ") must run in peered requestor mode with a REST Requestor producer"; + + ") must run in peered requestor mode with a REST Requestor producer"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } @@ -132,7 +142,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // Check if the HTTP method has been set if (restConsumerProperties.getHttpMethod() == null) { restConsumerProperties - .setHttpMethod(RestRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD); + .setHttpMethod(RestRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD); } // Check if the HTTP URL has been set @@ -151,6 +161,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { throw new ApexEventException(errorMessage, e); } + this.httpCodeFilterPattern = Pattern.compile(restConsumerProperties.getHttpCodeFilter()); + // Set the requestor timeout if (consumerParameters.getPeerTimeout(EventHandlerPeeredMode.REQUESTOR) != 0) { restRequestTimeout = consumerParameters.getPeerTimeout(EventHandlerPeeredMode.REQUESTOR); @@ -159,7 +171,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // Check if HTTP headers has been set if (restConsumerProperties.checkHttpHeadersSet()) { LOGGER.debug("REST Requestor consumer has http headers ({}): {}", this.name, - Arrays.deepToString(restConsumerProperties.getHttpHeaders())); + Arrays.deepToString(restConsumerProperties.getHttpHeaders())); } // Initialize the HTTP client @@ -177,8 +189,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { try { incomingRestRequestQueue.add(restRequest); } catch (final Exception requestException) { - final String errorMessage = "could not queue request \"" + restRequest + "\" on REST Requestor consumer (" - + this.name + ")"; + final String errorMessage = + "could not queue request \"" + restRequest + "\" on REST Requestor consumer (" + this.name + ")"; LOGGER.warn(errorMessage, requestException); throw new ApexEventRuntimeException(errorMessage); } @@ -238,8 +250,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { while (consumerThread.isAlive() && !stopOrderedFlag) { try { // Take the next event from the queue - final ApexRestRequest restRequest = incomingRestRequestQueue.poll(REST_REQUESTOR_WAIT_SLEEP_TIME, - TimeUnit.MILLISECONDS); + final ApexRestRequest restRequest = + incomingRestRequestQueue.poll(REST_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS); if (restRequest == null) { // Poll timed out, check for request timeouts timeoutExpiredRequests(); @@ -256,8 +268,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { op.filter(inputProperty::contains) .orElseThrow(() -> new ApexEventRuntimeException( "key\"" + op.get() + "\"specified on url \"" + restConsumerProperties.getUrl() - + "\"not found in execution properties passed by the current policy")) - ); + + "\"not found in execution properties passed by the current policy"))); untaggedUrl = names.stream().reduce(untaggedUrl, (acc, str) -> acc.replace("{" + str + "}", (String) inputExecutionProperties.get(str))); @@ -301,8 +312,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // Interrupt timed out requests and remove them from the ongoing map for (final ApexRestRequest timedoutRequest : timedoutRequestList) { - final String errorMessage = "REST Requestor consumer (" + this.name + "), REST request timed out: " - + timedoutRequest; + final String errorMessage = + "REST Requestor consumer (" + this.name + "), REST request timed out: " + timedoutRequest; LOGGER.warn(errorMessage); ongoingRestRequestMap.remove(timedoutRequest); @@ -356,12 +367,14 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // Execute the REST request final Response response = sendEventAsRestRequest(untaggedUrl); - // Check that the event request worked - if (!Response.Status.Family.familyOf(response.getStatus()).equals(Response.Status.Family.SUCCESSFUL)) { - final String errorMessage = "reception of response to \"" + request + "\" from URL \"" - + untaggedUrl + "\" failed with status code " - + response.getStatus() + " and message \"" + response.readEntity(String.class) - + "\""; + // Match the return code + Matcher isPass = httpCodeFilterPattern.matcher(String.valueOf(response.getStatus())); + + // Check that the request worked + if (!isPass.matches()) { + final String errorMessage ="reception of event from URL \"" + restConsumerProperties.getUrl() + + "\" failed with status code " + response.getStatus() + " and message \"" + + response.readEntity(String.class) + "\""; throw new ApexEventRuntimeException(errorMessage); } @@ -369,14 +382,18 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { final String eventJsonString = response.readEntity(String.class); // Check there is content - if (eventJsonString == null || eventJsonString.trim().length() == 0) { - final String errorMessage = "received an enpty response to \"" + request + "\" from URL \"" - + untaggedUrl + "\""; + if (StringUtils.isBlank(eventJsonString)) { + final String errorMessage = + "received an empty response to \"" + request + "\" from URL \"" + untaggedUrl + "\""; throw new ApexEventRuntimeException(errorMessage); } + // build a key and value property in excutionProperties + Properties executionProperties = new Properties(); + executionProperties.put(HTTP_CODE_STATUS, response.getStatus()); + // Send the event into Apex - eventReceiver.receiveEvent(request.getExecutionId(), new Properties(), eventJsonString); + eventReceiver.receiveEvent(request.getExecutionId(), executionProperties, eventJsonString); synchronized (eventsReceivedLock) { eventsReceived++; @@ -403,24 +420,20 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { * @return the response to the REST request */ public Response sendEventAsRestRequest(String untaggedUrl) { + Builder headers = client.target(untaggedUrl).request(APPLICATION_JSON) + .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()); switch (restConsumerProperties.getHttpMethod()) { case GET: - return client.target(untaggedUrl).request(APPLICATION_JSON) - .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()).get(); + return headers.get(); case PUT: - return client.target(untaggedUrl).request(APPLICATION_JSON) - .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()) - .put(Entity.json(request.getEvent())); + return headers.put(Entity.json(request.getEvent())); case POST: - return client.target(untaggedUrl).request(APPLICATION_JSON) - .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()) - .post(Entity.json(request.getEvent())); + return headers.post(Entity.json(request.getEvent())); case DELETE: - return client.target(untaggedUrl).request(APPLICATION_JSON) - .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()).delete(); + return headers.delete(); default: break; |