aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java87
1 files changed, 50 insertions, 37 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java
index 82d16273f..f60abc24c 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java
@@ -36,11 +36,15 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
@@ -67,6 +71,9 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
// stopped
private static final long REST_REQUESTOR_WAIT_SLEEP_TIME = 50;
+ // The Key for property
+ private static final String HTTP_CODE_STATUS = "HTTP_CODE_STATUS";
+
// The REST parameters read from the parameter service
private RestRequestorCarrierTechnologyParameters restConsumerProperties;
@@ -104,27 +111,30 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
private String untaggedUrl = null;
+ // The pattern for filtering status code
+ private Pattern httpCodeFilterPattern = null;
+
@Override
public void init(final String consumerName, final EventHandlerParameters consumerParameters,
- final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
this.eventReceiver = incomingEventReceiver;
this.name = consumerName;
// Check and get the REST Properties
if (!(consumerParameters
- .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) {
- final String errorMessage = "specified consumer properties are not applicable to REST Requestor consumer ("
- + this.name + ")";
+ .getCarrierTechnologyParameters() instanceof RestRequestorCarrierTechnologyParameters)) {
+ final String errorMessage =
+ "specified consumer properties are not applicable to REST Requestor consumer (" + this.name + ")";
LOGGER.warn(errorMessage);
throw new ApexEventException(errorMessage);
}
- restConsumerProperties = (RestRequestorCarrierTechnologyParameters) consumerParameters
- .getCarrierTechnologyParameters();
+ restConsumerProperties =
+ (RestRequestorCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
// Check if we are in peered mode
if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
final String errorMessage = "REST Requestor consumer (" + this.name
- + ") must run in peered requestor mode with a REST Requestor producer";
+ + ") must run in peered requestor mode with a REST Requestor producer";
LOGGER.warn(errorMessage);
throw new ApexEventException(errorMessage);
}
@@ -132,7 +142,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
// Check if the HTTP method has been set
if (restConsumerProperties.getHttpMethod() == null) {
restConsumerProperties
- .setHttpMethod(RestRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD);
+ .setHttpMethod(RestRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD);
}
// Check if the HTTP URL has been set
@@ -151,6 +161,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
throw new ApexEventException(errorMessage, e);
}
+ this.httpCodeFilterPattern = Pattern.compile(restConsumerProperties.getHttpCodeFilter());
+
// Set the requestor timeout
if (consumerParameters.getPeerTimeout(EventHandlerPeeredMode.REQUESTOR) != 0) {
restRequestTimeout = consumerParameters.getPeerTimeout(EventHandlerPeeredMode.REQUESTOR);
@@ -159,7 +171,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
// Check if HTTP headers has been set
if (restConsumerProperties.checkHttpHeadersSet()) {
LOGGER.debug("REST Requestor consumer has http headers ({}): {}", this.name,
- Arrays.deepToString(restConsumerProperties.getHttpHeaders()));
+ Arrays.deepToString(restConsumerProperties.getHttpHeaders()));
}
// Initialize the HTTP client
@@ -177,8 +189,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
try {
incomingRestRequestQueue.add(restRequest);
} catch (final Exception requestException) {
- final String errorMessage = "could not queue request \"" + restRequest + "\" on REST Requestor consumer ("
- + this.name + ")";
+ final String errorMessage =
+ "could not queue request \"" + restRequest + "\" on REST Requestor consumer (" + this.name + ")";
LOGGER.warn(errorMessage, requestException);
throw new ApexEventRuntimeException(errorMessage);
}
@@ -238,8 +250,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
while (consumerThread.isAlive() && !stopOrderedFlag) {
try {
// Take the next event from the queue
- final ApexRestRequest restRequest = incomingRestRequestQueue.poll(REST_REQUESTOR_WAIT_SLEEP_TIME,
- TimeUnit.MILLISECONDS);
+ final ApexRestRequest restRequest =
+ incomingRestRequestQueue.poll(REST_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS);
if (restRequest == null) {
// Poll timed out, check for request timeouts
timeoutExpiredRequests();
@@ -256,8 +268,7 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
op.filter(inputProperty::contains)
.orElseThrow(() -> new ApexEventRuntimeException(
"key\"" + op.get() + "\"specified on url \"" + restConsumerProperties.getUrl()
- + "\"not found in execution properties passed by the current policy"))
- );
+ + "\"not found in execution properties passed by the current policy")));
untaggedUrl = names.stream().reduce(untaggedUrl,
(acc, str) -> acc.replace("{" + str + "}", (String) inputExecutionProperties.get(str)));
@@ -301,8 +312,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
// Interrupt timed out requests and remove them from the ongoing map
for (final ApexRestRequest timedoutRequest : timedoutRequestList) {
- final String errorMessage = "REST Requestor consumer (" + this.name + "), REST request timed out: "
- + timedoutRequest;
+ final String errorMessage =
+ "REST Requestor consumer (" + this.name + "), REST request timed out: " + timedoutRequest;
LOGGER.warn(errorMessage);
ongoingRestRequestMap.remove(timedoutRequest);
@@ -356,12 +367,14 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
// Execute the REST request
final Response response = sendEventAsRestRequest(untaggedUrl);
- // Check that the event request worked
- if (!Response.Status.Family.familyOf(response.getStatus()).equals(Response.Status.Family.SUCCESSFUL)) {
- final String errorMessage = "reception of response to \"" + request + "\" from URL \""
- + untaggedUrl + "\" failed with status code "
- + response.getStatus() + " and message \"" + response.readEntity(String.class)
- + "\"";
+ // Match the return code
+ Matcher isPass = httpCodeFilterPattern.matcher(String.valueOf(response.getStatus()));
+
+ // Check that the request worked
+ if (!isPass.matches()) {
+ final String errorMessage ="reception of event from URL \"" + restConsumerProperties.getUrl()
+ + "\" failed with status code " + response.getStatus() + " and message \""
+ + response.readEntity(String.class) + "\"";
throw new ApexEventRuntimeException(errorMessage);
}
@@ -369,14 +382,18 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
final String eventJsonString = response.readEntity(String.class);
// Check there is content
- if (eventJsonString == null || eventJsonString.trim().length() == 0) {
- final String errorMessage = "received an enpty response to \"" + request + "\" from URL \""
- + untaggedUrl + "\"";
+ if (StringUtils.isBlank(eventJsonString)) {
+ final String errorMessage =
+ "received an empty response to \"" + request + "\" from URL \"" + untaggedUrl + "\"";
throw new ApexEventRuntimeException(errorMessage);
}
+ // build a key and value property in excutionProperties
+ Properties executionProperties = new Properties();
+ executionProperties.put(HTTP_CODE_STATUS, response.getStatus());
+
// Send the event into Apex
- eventReceiver.receiveEvent(request.getExecutionId(), new Properties(), eventJsonString);
+ eventReceiver.receiveEvent(request.getExecutionId(), executionProperties, eventJsonString);
synchronized (eventsReceivedLock) {
eventsReceived++;
@@ -403,24 +420,20 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable {
* @return the response to the REST request
*/
public Response sendEventAsRestRequest(String untaggedUrl) {
+ Builder headers = client.target(untaggedUrl).request(APPLICATION_JSON)
+ .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap());
switch (restConsumerProperties.getHttpMethod()) {
case GET:
- return client.target(untaggedUrl).request(APPLICATION_JSON)
- .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()).get();
+ return headers.get();
case PUT:
- return client.target(untaggedUrl).request(APPLICATION_JSON)
- .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap())
- .put(Entity.json(request.getEvent()));
+ return headers.put(Entity.json(request.getEvent()));
case POST:
- return client.target(untaggedUrl).request(APPLICATION_JSON)
- .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap())
- .post(Entity.json(request.getEvent()));
+ return headers.post(Entity.json(request.getEvent()));
case DELETE:
- return client.target(untaggedUrl).request(APPLICATION_JSON)
- .headers(restConsumerProperties.getHttpHeadersAsMultivaluedMap()).delete();
+ return headers.delete();
default:
break;