diff options
Diffstat (limited to 'plugins')
7 files changed, 199 insertions, 84 deletions
diff --git a/plugins/plugins-context/plugins-context-schema/plugins-context-schema-avro/src/main/java/org/onap/policy/apex/plugins/context/schema/avro/AvroSchemaHelper.java b/plugins/plugins-context/plugins-context-schema/plugins-context-schema-avro/src/main/java/org/onap/policy/apex/plugins/context/schema/avro/AvroSchemaHelper.java index ed2b521c4..015d9ea30 100644 --- a/plugins/plugins-context/plugins-context-schema/plugins-context-schema-avro/src/main/java/org/onap/policy/apex/plugins/context/schema/avro/AvroSchemaHelper.java +++ b/plugins/plugins-context/plugins-context-schema/plugins-context-schema-avro/src/main/java/org/onap/policy/apex/plugins/context/schema/avro/AvroSchemaHelper.java @@ -166,21 +166,7 @@ public class AvroSchemaHelper extends AbstractSchemaHelper { private String getStringObject(final Object object) { try { if (isObjectString(object)) { - String objectString = object.toString().trim(); - if (objectString.length() == 0) { - return "\"\""; - } else if (objectString.length() == 1) { - return "\"" + objectString + "\""; - } else { - // All strings must be quoted for decoding - if (objectString.charAt(0) != '"') { - objectString = '"' + objectString; - } - if (objectString.charAt(objectString.length() - 1) != '"') { - objectString += '"'; - } - } - return objectString; + return getObjectString(object); } else { return (String) object; } @@ -194,6 +180,30 @@ public class AvroSchemaHelper extends AbstractSchemaHelper { } } + /** + * Get a string object. + * + * @param object the string object + * @return the string + */ + private String getObjectString(final Object object) { + String objectString = object.toString().trim(); + if (objectString.length() == 0) { + return "\"\""; + } else if (objectString.length() == 1) { + return "\"" + objectString + "\""; + } else { + // All strings must be quoted for decoding + if (objectString.charAt(0) != '"') { + objectString = '"' + objectString; + } + if (objectString.charAt(objectString.length() - 1) != '"') { + objectString += '"'; + } + } + return objectString; + } + private boolean isObjectString(final Object object) { return object != null && avroSchema.getType().equals(Schema.Type.STRING); } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java index 8b120ca3a..d4cb2ab9a 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java @@ -328,38 +328,38 @@ public class JmsCarrierTechnologyParameters extends CarrierTechnologyParameters public GroupValidationResult validate() { final GroupValidationResult result = super.validate(); - if (initialContextFactory == null || initialContextFactory.trim().length() == 0) { + if (isNullOrBlank(initialContextFactory)) { result.setResult("initialContextFactory", ValidationStatus.INVALID, "initialContextFactory must be specified as a string that is a class that implements the " + "interface org.jboss.naming.remote.client.InitialContextFactory"); } - if (providerUrl == null || providerUrl.trim().length() == 0) { + if (isNullOrBlank(providerUrl)) { result.setResult("providerUrl", ValidationStatus.INVALID, "providerUrl must be specified as a URL string that specifies the location of " + "configuration information for the service provider to use " + "such as remote://localhost:4447"); } - if (securityPrincipal == null || securityPrincipal.trim().length() == 0) { + if (isNullOrBlank(securityPrincipal)) { result.setResult("securityPrincipal", ValidationStatus.INVALID, "securityPrincipal must be specified the identity of the principal for authenticating " + "the caller to the service"); } - if (securityCredentials == null || securityCredentials.trim().length() == 0) { + if (isNullOrBlank(securityCredentials)) { result.setResult("securityCredentials", ValidationStatus.INVALID, " securityCredentials must be specified as the credentials of the " + "principal for authenticating the caller to the service"); } - if (producerTopic == null || producerTopic.trim().length() == 0) { + if (isNullOrBlank(producerTopic)) { result.setResult("producerTopic", ValidationStatus.INVALID, " producerTopic must be a string that identifies the JMS topic " + "on which Apex will send events"); } - if (consumerTopic == null || consumerTopic.trim().length() == 0) { + if (isNullOrBlank(consumerTopic)) { result.setResult("consumerTopic", ValidationStatus.INVALID, " consumerTopic must be a string that identifies the JMS topic " + "on which Apex will recieve events"); @@ -372,4 +372,14 @@ public class JmsCarrierTechnologyParameters extends CarrierTechnologyParameters return result; } + + /** + * Check if the string is null or blank. + * + * @param stringValue the string value + * @return + */ + private boolean isNullOrBlank(final String stringValue) { + return stringValue == null || stringValue.trim().length() == 0; + } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 94ac2bcca..affd10ccb 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -166,10 +166,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { final ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration().toMillis()); for (final ConsumerRecord<String, String> record : records) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", - this.getClass().getName() + ":" + this.name, record.key(), record.value()); - } + traceIfTraceEnabled(record); eventReceiver.receiveEvent(record.value()); } } catch (final Exception e) { @@ -182,6 +179,18 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { } } + /** + * Trace a record if trace is enabled. + * + * @param record the record to trace + */ + private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", + this.getClass().getName() + ":" + this.name, record.key(), record.value()); + } + } + /* * (non-Javadoc) * diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 7c24ce1aa..851741611 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -62,10 +62,8 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private static final String DEFAULT_PROD_TOPIC = "apex-out"; private static final int DEFAULT_CONS_POLL_TIME = 100; private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"}; - private static final String DEFAULT_KEY_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String DEFAULT_VAL_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String DEFAULT_KEY_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; - private static final String DEFAULT_VALUE_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String DEFAULT_STRING_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_STRING_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; // Parameter property map tokens private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers"; @@ -97,10 +95,10 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private String producerTopic = DEFAULT_PROD_TOPIC; private int consumerPollTime = DEFAULT_CONS_POLL_TIME; private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST; - private String keySerializer = DEFAULT_KEY_SERZER; - private String valueSerializer = DEFAULT_VAL_SERZER; - private String keyDeserializer = DEFAULT_KEY_DESZER; - private String valueDeserializer = DEFAULT_VALUE_DESZER; + private String keySerializer = DEFAULT_STRING_SERZER; + private String valueSerializer = DEFAULT_STRING_SERZER; + private String keyDeserializer = DEFAULT_STRING_DESZER; + private String valueDeserializer = DEFAULT_STRING_DESZER; // @formatter:on /** @@ -325,6 +323,23 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter public GroupValidationResult validate() { final GroupValidationResult result = super.validate(); + validateStringParameters(result); + + validateNumericParameters(result); + + validateConsumerTopicList(result); + + validateSerializersAndDeserializers(result); + + return result; + } + + /** + * Validate that string parameters are correct. + * + * @param result the result of the validation + */ + private void validateStringParameters(final GroupValidationResult result) { if (isNullOrBlank(bootstrapServers)) { result.setResult("bootstrapServers", ValidationStatus.INVALID, "not specified, must be specified as a string of form host:port"); @@ -335,6 +350,22 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter "not specified, must be specified as a string with values [0|1|all]"); } + if (isNullOrBlank(groupId)) { + result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); + } + + if (isNullOrBlank(producerTopic)) { + result.setResult("producerTopic", ValidationStatus.INVALID, + SPECIFY_AS_STRING_MESSAGE); + } + } + + /** + * Check if numeric parameters are valid. + * + * @param result the result of the validation + */ + private void validateNumericParameters(final GroupValidationResult result) { if (retries < 0) { result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID, "[" + retries + "] invalid, must be specified as retries >= 0"); @@ -355,10 +386,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0"); } - if (isNullOrBlank(groupId)) { - result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); - } - if (autoCommitTime < 0) { result.setResult("autoCommitTime", ValidationStatus.INVALID, "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0"); @@ -369,18 +396,18 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0"); } - if (isNullOrBlank(producerTopic)) { - result.setResult("producerTopic", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - if (consumerPollTime < 0) { result.setResult("consumerPollTime", ValidationStatus.INVALID, "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0"); } + } - validateConsumerTopicList(result); - + /** + * Validate the serializers and deserializers. + * + * @param result the result of the validation. + */ + private void validateSerializersAndDeserializers(final GroupValidationResult result) { if (isNullOrBlank(keySerializer)) { result.setResult("keySerializer", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); @@ -400,8 +427,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter result.setResult("valueDeserializer", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); } - - return result; } private void validateConsumerTopicList(final GroupValidationResult result) { diff --git a/plugins/plugins-event/plugins-event-protocol/plugins-event-protocol-yaml/src/main/java/org/onap/policy/apex/plugins/event/protocol/yaml/Apex2YamlEventConverter.java b/plugins/plugins-event/plugins-event-protocol/plugins-event-protocol-yaml/src/main/java/org/onap/policy/apex/plugins/event/protocol/yaml/Apex2YamlEventConverter.java index 4bf10e4ae..f3afdde32 100644 --- a/plugins/plugins-event/plugins-event-protocol/plugins-event-protocol-yaml/src/main/java/org/onap/policy/apex/plugins/event/protocol/yaml/Apex2YamlEventConverter.java +++ b/plugins/plugins-event/plugins-event-protocol/plugins-event-protocol-yaml/src/main/java/org/onap/policy/apex/plugins/event/protocol/yaml/Apex2YamlEventConverter.java @@ -267,6 +267,23 @@ public class Apex2YamlEventConverter implements ApexEventProtocolConverter { version = eventDefinition.getKey().getVersion(); } + String namespace = getEventHeaderNamespace(yamlMap, name, eventDefinition); + + String source = getEventHeaderSource(yamlMap, eventDefinition); + + String target = getHeaderTarget(yamlMap, eventDefinition); + + return new ApexEvent(name, version, namespace, source, target); + } + + /** + * Get the event header name space. + * + * @param yamlMap the YAML map to read from + * @param eventDefinition the event definition + * @return the event header name space + */ + private String getEventHeaderNamespace(final Map<?, ?> yamlMap, String name, final AxEvent eventDefinition) { // Check the name space is OK if it is defined, if not, use the name space from the model String namespace = getYamlStringField(yamlMap, ApexEvent.NAMESPACE_HEADER_FIELD, yamlPars.getNameSpaceAlias(), ApexEvent.NAMESPACE_REGEXP, false); @@ -279,22 +296,41 @@ public class Apex2YamlEventConverter implements ApexEventProtocolConverter { } else { namespace = eventDefinition.getNameSpace(); } + return namespace; + } + /** + * Get the event header source. + * + * @param yamlMap the YAML map to read from + * @param eventDefinition the event definition + * @return the event header source + */ + private String getEventHeaderSource(final Map<?, ?> yamlMap, final AxEvent eventDefinition) { // For source, use the defined source only if the source is not found on the incoming event String source = getYamlStringField(yamlMap, ApexEvent.SOURCE_HEADER_FIELD, yamlPars.getSourceAlias(), ApexEvent.SOURCE_REGEXP, false); if (source == null) { source = eventDefinition.getSource(); } + return source; + } + /** + * Get the event header target. + * + * @param yamlMap the YAML map to read from + * @param eventDefinition the event definition + * @return the event header target + */ + private String getHeaderTarget(final Map<?, ?> yamlMap, final AxEvent eventDefinition) { // For target, use the defined source only if the source is not found on the incoming event String target = getYamlStringField(yamlMap, ApexEvent.TARGET_HEADER_FIELD, yamlPars.getTargetAlias(), ApexEvent.TARGET_REGEXP, false); if (target == null) { target = eventDefinition.getTarget(); } - - return new ApexEvent(name, version, namespace, source, target); + return target; } /** diff --git a/plugins/plugins-executor/plugins-executor-jython/src/main/java/org/onap/policy/apex/plugins/executor/jython/JythonTaskExecutor.java b/plugins/plugins-executor/plugins-executor-jython/src/main/java/org/onap/policy/apex/plugins/executor/jython/JythonTaskExecutor.java index 39ca0dc43..bddb63b42 100644 --- a/plugins/plugins-executor/plugins-executor-jython/src/main/java/org/onap/policy/apex/plugins/executor/jython/JythonTaskExecutor.java +++ b/plugins/plugins-executor/plugins-executor-jython/src/main/java/org/onap/policy/apex/plugins/executor/jython/JythonTaskExecutor.java @@ -100,23 +100,7 @@ public class JythonTaskExecutor extends TaskExecutor { // Set up the Jython engine interpreter.set("executor", getExecutionContext()); interpreter.exec(compiled); - try { - final Object ret = interpreter.get("returnValue", java.lang.Boolean.class); - if (ret == null) { - LOGGER.error("execute: task logic failed to set a return value for task \"" - + getSubject().getKey().getId() + "\""); - throw new StateMachineException("execute: task logic failed to set a return value for task \"" - + getSubject().getKey().getId() + "\""); - } - returnValue = (Boolean) ret; - } catch (NullPointerException | ClassCastException e) { - LOGGER.error("execute: task selection logic failed to set a correct return value for state \"" - + getSubject().getKey().getId() + "\"", e); - throw new StateMachineException( - "execute: task selection logic failed to set a return value for state \"" - + getSubject().getKey().getId() + "\"", - e); - } + returnValue = handleInterpreterResult(); } /* */ } catch (final Exception e) { @@ -137,6 +121,35 @@ public class JythonTaskExecutor extends TaskExecutor { } /** + * Handle the result returned by the interpreter. + * + * @return true if the result was successful + * @throws StateMachineException on interpreter failures + */ + private boolean handleInterpreterResult() throws StateMachineException { + boolean returnValue = false; + + try { + final Object ret = interpreter.get("returnValue", java.lang.Boolean.class); + if (ret == null) { + LOGGER.error("execute: task logic failed to set a return value for task \"" + + getSubject().getKey().getId() + "\""); + throw new StateMachineException("execute: task logic failed to set a return value for task \"" + + getSubject().getKey().getId() + "\""); + } + returnValue = (Boolean) ret; + } catch (NullPointerException | ClassCastException e) { + LOGGER.error("execute: task selection logic failed to set a correct return value for state \"" + + getSubject().getKey().getId() + "\"", e); + throw new StateMachineException( + "execute: task selection logic failed to set a return value for state \"" + + getSubject().getKey().getId() + "\"", + e); + } + return returnValue; + } + + /** * Cleans up the task after processing. * * @throws StateMachineException thrown when a state machine execution error occurs diff --git a/plugins/plugins-executor/plugins-executor-jython/src/main/java/org/onap/policy/apex/plugins/executor/jython/JythonTaskSelectExecutor.java b/plugins/plugins-executor/plugins-executor-jython/src/main/java/org/onap/policy/apex/plugins/executor/jython/JythonTaskSelectExecutor.java index 3ff061fa4..9a2433122 100644 --- a/plugins/plugins-executor/plugins-executor-jython/src/main/java/org/onap/policy/apex/plugins/executor/jython/JythonTaskSelectExecutor.java +++ b/plugins/plugins-executor/plugins-executor-jython/src/main/java/org/onap/policy/apex/plugins/executor/jython/JythonTaskSelectExecutor.java @@ -103,25 +103,7 @@ public class JythonTaskSelectExecutor extends TaskSelectExecutor { // Set up the Jython engine interpreter.set("executor", getExecutionContext()); interpreter.exec(compiled); - - try { - final Object ret = interpreter.get("returnValue", java.lang.Boolean.class); - if (ret == null) { - LOGGER.error(TSL_FAILED_PREFIX - + getSubject().getKey().getId() + "\""); - throw new StateMachineException( - TSL_FAILED_PREFIX - + getSubject().getKey().getId() + "\""); - } - returnValue = (Boolean) ret; - } catch (NullPointerException | ClassCastException e) { - LOGGER.error("execute: task selection logic failed to set a correct return value for state \"" - + getSubject().getKey().getId() + "\"", e); - throw new StateMachineException( - TSL_FAILED_PREFIX - + getSubject().getKey().getId() + "\"", - e); - } + returnValue = handleInterpreterResult(); } /* */ } catch (final Exception e) { @@ -143,6 +125,36 @@ public class JythonTaskSelectExecutor extends TaskSelectExecutor { } /** + * Handle the result returned by the interpreter. + * + * @return true if the result was successful + * @throws StateMachineException on interpreter errors + */ + private boolean handleInterpreterResult() throws StateMachineException { + boolean returnValue = false; + + try { + final Object ret = interpreter.get("returnValue", java.lang.Boolean.class); + if (ret == null) { + LOGGER.error(TSL_FAILED_PREFIX + + getSubject().getKey().getId() + "\""); + throw new StateMachineException( + TSL_FAILED_PREFIX + + getSubject().getKey().getId() + "\""); + } + returnValue = (Boolean) ret; + } catch (NullPointerException | ClassCastException e) { + LOGGER.error("execute: task selection logic failed to set a correct return value for state \"" + + getSubject().getKey().getId() + "\"", e); + throw new StateMachineException( + TSL_FAILED_PREFIX + + getSubject().getKey().getId() + "\"", + e); + } + return returnValue; + } + + /** * Cleans up the task after processing. * * @throws StateMachineException thrown when a state machine execution error occurs |