diff options
Diffstat (limited to 'plugins')
2 files changed, 59 insertions, 51 deletions
diff --git a/plugins/plugins-context/context-schema/context-schema-avro/src/main/java/org/onap/policy/apex/plugins/context/schema/avro/AvroSchemaHelper.java b/plugins/plugins-context/context-schema/context-schema-avro/src/main/java/org/onap/policy/apex/plugins/context/schema/avro/AvroSchemaHelper.java index 831962042..b4cc38602 100644 --- a/plugins/plugins-context/context-schema/context-schema-avro/src/main/java/org/onap/policy/apex/plugins/context/schema/avro/AvroSchemaHelper.java +++ b/plugins/plugins-context/context-schema/context-schema-avro/src/main/java/org/onap/policy/apex/plugins/context/schema/avro/AvroSchemaHelper.java @@ -127,19 +127,42 @@ public class AvroSchemaHelper extends AbstractSchemaHelper { return object; } - // Check that the incoming object is a string, the incoming object must be a string - // containing Json - String objectString; + String objectString = getStringObject(object); + + // Translate illegal characters in incoming JSON keys to legal Avro values + objectString = AvroSchemaKeyTranslationUtilities.translateIllegalKeys(objectString, false); + + // Decode the object + Object decodedObject; try { - if (object == null) { - objectString = null; - } - if (object != null && avroSchema.getType().equals(Schema.Type.STRING)) { - objectString = object.toString().trim(); + final JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(avroSchema, objectString); + decodedObject = new GenericDatumReader<GenericRecord>(avroSchema).read(null, jsonDecoder); + } catch (final Exception e) { + final String returnString = getUserKey().getID() + ": object \"" + objectString + + "\" Avro unmarshalling failed: " + e.getMessage(); + LOGGER.warn(returnString, e); + throw new ContextRuntimeException(returnString, e); + } + + // Now map the decoded object into something we can handle + return avroObjectMapper.mapFromAvro(decodedObject); + } + + /** + * Check that the incoming object is a string, the incoming object must be a string containing + * Json + * + * @param object incoming object + * @return object as String + */ + private String getStringObject(final Object object) { + try { + if (isObjectString(object)) { + String objectString = object.toString().trim(); if (objectString.length() == 0) { - objectString = "\"\""; + return "\"\""; } else if (objectString.length() == 1) { - objectString = "\"" + objectString + "\""; + return "\"" + objectString + "\""; } else { // All strings must be quoted for decoding if (objectString.charAt(0) != '"') { @@ -149,35 +172,22 @@ public class AvroSchemaHelper extends AbstractSchemaHelper { objectString += '"'; } } + return objectString; } else { - objectString = (String) object; + return (String) object; } } catch (final ClassCastException e) { final String returnString = getUserKey().getID() + ": object \"" + object + "\" of type \"" - + object.getClass().getCanonicalName() + "\" must be assignable to \"" + + (object != null ? object.getClass().getCanonicalName() : "null") + "\" must be assignable to \"" + getSchemaClass().getCanonicalName() + "\" or be a Json string representation of it for Avro unmarshalling"; LOGGER.warn(returnString); throw new ContextRuntimeException(returnString); } + } - // Translate illegal characters in incoming JSON keys to legal Avro values - objectString = AvroSchemaKeyTranslationUtilities.translateIllegalKeys(objectString, false); - - // Decode the object - Object decodedObject; - try { - final JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(avroSchema, objectString); - decodedObject = new GenericDatumReader<GenericRecord>(avroSchema).read(null, jsonDecoder); - } catch (final Exception e) { - final String returnString = getUserKey().getID() + ": object \"" + objectString - + "\" Avro unmarshalling failed: " + e.getMessage(); - LOGGER.warn(returnString, e); - throw new ContextRuntimeException(returnString, e); - } - - // Now map the decoded object into something we can handle - return avroObjectMapper.mapFromAvro(decodedObject); + private boolean isObjectString(final Object object) { + return object != null && avroSchema.getType().equals(Schema.Type.STRING); } @Override @@ -185,21 +195,25 @@ public class AvroSchemaHelper extends AbstractSchemaHelper { // Condition the object for Avro encoding final Object conditionedObject = avroObjectMapper.mapToAvro(object); - final ByteArrayOutputStream output = new ByteArrayOutputStream(); - try { + final String jsonString = getJsonString(object, conditionedObject); + + return AvroSchemaKeyTranslationUtilities.translateIllegalKeys(jsonString, true); + } + + private String getJsonString(final Object object, final Object conditionedObject) { + + try (final ByteArrayOutputStream output = new ByteArrayOutputStream();) { final DatumWriter<Object> writer = new GenericDatumWriter<>(avroSchema); final JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(avroSchema, output, true); writer.write(conditionedObject, jsonEncoder); jsonEncoder.flush(); - output.close(); + return new String(output.toByteArray()); } catch (final Exception e) { final String returnString = getUserKey().getID() + ": object \"" + object + "\" Avro marshalling failed: " + e.getMessage(); LOGGER.warn(returnString); throw new ContextRuntimeException(returnString, e); } - - return AvroSchemaKeyTranslationUtilities.translateIllegalKeys(new String(output.toByteArray()), true); } @Override diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java index 735053279..a2069a794 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-restrequestor/src/main/java/org/onap/policy/apex/plugins/event/carrier/restrequestor/ApexRestRequestorConsumer.java @@ -38,7 +38,6 @@ import javax.ws.rs.core.Response; import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.plugins.event.carrier.restrequestor.RESTRequestorCarrierTechnologyParameters.HTTP_METHOD; import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; @@ -50,8 +49,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This class implements an Apex event consumer that issues a REST request and returns the REST response to APEX as an - * event. + * This class implements an Apex event consumer that issues a REST request and returns the REST + * response to APEX as an event. * * @author Liam Fallon (liam.fallon@ericsson.com) */ @@ -59,7 +58,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexRestRequestorConsumer.class); - // The amount of time to wait in milliseconds between checks that the consumer thread has stopped + // The amount of time to wait in milliseconds between checks that the consumer thread has + // stopped private static final long REST_REQUESTOR_WAIT_SLEEP_TIME = 50; // The REST parameters read from the parameter service @@ -128,14 +128,6 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { .setHttpMethod(RESTRequestorCarrierTechnologyParameters.DEFAULT_REQUESTOR_HTTP_METHOD); } - if (!(restConsumerProperties.getHttpMethod() instanceof HTTP_METHOD)) { - final String errorMessage = "specified HTTP method of \"" + restConsumerProperties.getHttpMethod() - + "\" is invalid, only HTTP methods " + HTTP_METHOD.values() - + " are valid on REST Requestor consumer (" + this.name + ")"; - LOGGER.warn(errorMessage); - throw new ApexEventException(errorMessage); - } - // Check if the HTTP URL has been set if (restConsumerProperties.getURL() == null) { final String errorMessage = "no URL has been specified on REST Requestor consumer (" + this.name + ")"; @@ -215,8 +207,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { /* * (non-Javadoc) * - * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.policy.apex.service. - * parameters.eventhandler.EventHandlerPeeredMode) + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap. + * policy.apex.service. parameters.eventhandler.EventHandlerPeeredMode) */ @Override public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { @@ -226,8 +218,9 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { /* * (non-Javadoc) * - * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.policy.apex.service. - * parameters.eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference) + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap. + * policy.apex.service. parameters.eventhandler.EventHandlerPeeredMode, + * org.onap.policy.apex.service.engine.event.PeeredReference) */ @Override public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { @@ -256,7 +249,8 @@ public class ApexRestRequestorConsumer implements ApexEventConsumer, Runnable { // Set the time stamp of the REST request restRequest.setTimestamp(System.currentTimeMillis()); - // Create a thread to process the REST request and place it on the map of ongoing requests + // Create a thread to process the REST request and place it on the map of ongoing + // requests final RestRequestRunner restRequestRunner = new RestRequestRunner(restRequest); ongoingRestRequestMap.put(restRequest, restRequestRunner); |