diff options
Diffstat (limited to 'plugins/plugins-event')
4 files changed, 113 insertions, 33 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java index 8b120ca3a..d4cb2ab9a 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-jms/src/main/java/org/onap/policy/apex/plugins/event/carrier/jms/JmsCarrierTechnologyParameters.java @@ -328,38 +328,38 @@ public class JmsCarrierTechnologyParameters extends CarrierTechnologyParameters public GroupValidationResult validate() { final GroupValidationResult result = super.validate(); - if (initialContextFactory == null || initialContextFactory.trim().length() == 0) { + if (isNullOrBlank(initialContextFactory)) { result.setResult("initialContextFactory", ValidationStatus.INVALID, "initialContextFactory must be specified as a string that is a class that implements the " + "interface org.jboss.naming.remote.client.InitialContextFactory"); } - if (providerUrl == null || providerUrl.trim().length() == 0) { + if (isNullOrBlank(providerUrl)) { result.setResult("providerUrl", ValidationStatus.INVALID, "providerUrl must be specified as a URL string that specifies the location of " + "configuration information for the service provider to use " + "such as remote://localhost:4447"); } - if (securityPrincipal == null || securityPrincipal.trim().length() == 0) { + if (isNullOrBlank(securityPrincipal)) { result.setResult("securityPrincipal", ValidationStatus.INVALID, "securityPrincipal must be specified the identity of the principal for authenticating " + "the caller to the service"); } - if (securityCredentials == null || securityCredentials.trim().length() == 0) { + if (isNullOrBlank(securityCredentials)) { result.setResult("securityCredentials", ValidationStatus.INVALID, " securityCredentials must be specified as the credentials of the " + "principal for authenticating the caller to the service"); } - if (producerTopic == null || producerTopic.trim().length() == 0) { + if (isNullOrBlank(producerTopic)) { result.setResult("producerTopic", ValidationStatus.INVALID, " producerTopic must be a string that identifies the JMS topic " + "on which Apex will send events"); } - if (consumerTopic == null || consumerTopic.trim().length() == 0) { + if (isNullOrBlank(consumerTopic)) { result.setResult("consumerTopic", ValidationStatus.INVALID, " consumerTopic must be a string that identifies the JMS topic " + "on which Apex will recieve events"); @@ -372,4 +372,14 @@ public class JmsCarrierTechnologyParameters extends CarrierTechnologyParameters return result; } + + /** + * Check if the string is null or blank. + * + * @param stringValue the string value + * @return + */ + private boolean isNullOrBlank(final String stringValue) { + return stringValue == null || stringValue.trim().length() == 0; + } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 94ac2bcca..affd10ccb 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -166,10 +166,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { final ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration().toMillis()); for (final ConsumerRecord<String, String> record : records) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", - this.getClass().getName() + ":" + this.name, record.key(), record.value()); - } + traceIfTraceEnabled(record); eventReceiver.receiveEvent(record.value()); } } catch (final Exception e) { @@ -182,6 +179,18 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { } } + /** + * Trace a record if trace is enabled. + * + * @param record the record to trace + */ + private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", + this.getClass().getName() + ":" + this.name, record.key(), record.value()); + } + } + /* * (non-Javadoc) * diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 7c24ce1aa..851741611 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -62,10 +62,8 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private static final String DEFAULT_PROD_TOPIC = "apex-out"; private static final int DEFAULT_CONS_POLL_TIME = 100; private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"}; - private static final String DEFAULT_KEY_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String DEFAULT_VAL_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String DEFAULT_KEY_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; - private static final String DEFAULT_VALUE_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String DEFAULT_STRING_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_STRING_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; // Parameter property map tokens private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers"; @@ -97,10 +95,10 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private String producerTopic = DEFAULT_PROD_TOPIC; private int consumerPollTime = DEFAULT_CONS_POLL_TIME; private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST; - private String keySerializer = DEFAULT_KEY_SERZER; - private String valueSerializer = DEFAULT_VAL_SERZER; - private String keyDeserializer = DEFAULT_KEY_DESZER; - private String valueDeserializer = DEFAULT_VALUE_DESZER; + private String keySerializer = DEFAULT_STRING_SERZER; + private String valueSerializer = DEFAULT_STRING_SERZER; + private String keyDeserializer = DEFAULT_STRING_DESZER; + private String valueDeserializer = DEFAULT_STRING_DESZER; // @formatter:on /** @@ -325,6 +323,23 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter public GroupValidationResult validate() { final GroupValidationResult result = super.validate(); + validateStringParameters(result); + + validateNumericParameters(result); + + validateConsumerTopicList(result); + + validateSerializersAndDeserializers(result); + + return result; + } + + /** + * Validate that string parameters are correct. + * + * @param result the result of the validation + */ + private void validateStringParameters(final GroupValidationResult result) { if (isNullOrBlank(bootstrapServers)) { result.setResult("bootstrapServers", ValidationStatus.INVALID, "not specified, must be specified as a string of form host:port"); @@ -335,6 +350,22 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter "not specified, must be specified as a string with values [0|1|all]"); } + if (isNullOrBlank(groupId)) { + result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); + } + + if (isNullOrBlank(producerTopic)) { + result.setResult("producerTopic", ValidationStatus.INVALID, + SPECIFY_AS_STRING_MESSAGE); + } + } + + /** + * Check if numeric parameters are valid. + * + * @param result the result of the validation + */ + private void validateNumericParameters(final GroupValidationResult result) { if (retries < 0) { result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID, "[" + retries + "] invalid, must be specified as retries >= 0"); @@ -355,10 +386,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0"); } - if (isNullOrBlank(groupId)) { - result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); - } - if (autoCommitTime < 0) { result.setResult("autoCommitTime", ValidationStatus.INVALID, "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0"); @@ -369,18 +396,18 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0"); } - if (isNullOrBlank(producerTopic)) { - result.setResult("producerTopic", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - if (consumerPollTime < 0) { result.setResult("consumerPollTime", ValidationStatus.INVALID, "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0"); } + } - validateConsumerTopicList(result); - + /** + * Validate the serializers and deserializers. + * + * @param result the result of the validation. + */ + private void validateSerializersAndDeserializers(final GroupValidationResult result) { if (isNullOrBlank(keySerializer)) { result.setResult("keySerializer", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); @@ -400,8 +427,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter result.setResult("valueDeserializer", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); } - - return result; } private void validateConsumerTopicList(final GroupValidationResult result) { diff --git a/plugins/plugins-event/plugins-event-protocol/plugins-event-protocol-yaml/src/main/java/org/onap/policy/apex/plugins/event/protocol/yaml/Apex2YamlEventConverter.java b/plugins/plugins-event/plugins-event-protocol/plugins-event-protocol-yaml/src/main/java/org/onap/policy/apex/plugins/event/protocol/yaml/Apex2YamlEventConverter.java index 4bf10e4ae..f3afdde32 100644 --- a/plugins/plugins-event/plugins-event-protocol/plugins-event-protocol-yaml/src/main/java/org/onap/policy/apex/plugins/event/protocol/yaml/Apex2YamlEventConverter.java +++ b/plugins/plugins-event/plugins-event-protocol/plugins-event-protocol-yaml/src/main/java/org/onap/policy/apex/plugins/event/protocol/yaml/Apex2YamlEventConverter.java @@ -267,6 +267,23 @@ public class Apex2YamlEventConverter implements ApexEventProtocolConverter { version = eventDefinition.getKey().getVersion(); } + String namespace = getEventHeaderNamespace(yamlMap, name, eventDefinition); + + String source = getEventHeaderSource(yamlMap, eventDefinition); + + String target = getHeaderTarget(yamlMap, eventDefinition); + + return new ApexEvent(name, version, namespace, source, target); + } + + /** + * Get the event header name space. + * + * @param yamlMap the YAML map to read from + * @param eventDefinition the event definition + * @return the event header name space + */ + private String getEventHeaderNamespace(final Map<?, ?> yamlMap, String name, final AxEvent eventDefinition) { // Check the name space is OK if it is defined, if not, use the name space from the model String namespace = getYamlStringField(yamlMap, ApexEvent.NAMESPACE_HEADER_FIELD, yamlPars.getNameSpaceAlias(), ApexEvent.NAMESPACE_REGEXP, false); @@ -279,22 +296,41 @@ public class Apex2YamlEventConverter implements ApexEventProtocolConverter { } else { namespace = eventDefinition.getNameSpace(); } + return namespace; + } + /** + * Get the event header source. + * + * @param yamlMap the YAML map to read from + * @param eventDefinition the event definition + * @return the event header source + */ + private String getEventHeaderSource(final Map<?, ?> yamlMap, final AxEvent eventDefinition) { // For source, use the defined source only if the source is not found on the incoming event String source = getYamlStringField(yamlMap, ApexEvent.SOURCE_HEADER_FIELD, yamlPars.getSourceAlias(), ApexEvent.SOURCE_REGEXP, false); if (source == null) { source = eventDefinition.getSource(); } + return source; + } + /** + * Get the event header target. + * + * @param yamlMap the YAML map to read from + * @param eventDefinition the event definition + * @return the event header target + */ + private String getHeaderTarget(final Map<?, ?> yamlMap, final AxEvent eventDefinition) { // For target, use the defined source only if the source is not found on the incoming event String target = getYamlStringField(yamlMap, ApexEvent.TARGET_HEADER_FIELD, yamlPars.getTargetAlias(), ApexEvent.TARGET_REGEXP, false); if (target == null) { target = eventDefinition.getTarget(); } - - return new ApexEvent(name, version, namespace, source, target); + return target; } /** |