aboutsummaryrefslogtreecommitdiffstats
path: root/services/services-engine/src/main/java/org/onap
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2019-06-26 15:40:41 +0000
committerliamfallon <liam.fallon@est.tech>2019-06-26 15:40:41 +0000
commitce9d82d2c0e863597d84cc8909955e398405f45a (patch)
tree8ebb853119bdf673cf6f9516d428d4cc00080aeb /services/services-engine/src/main/java/org/onap
parent5f029543f1e673655af2d2974113069df0b6def0 (diff)
Add passthrough properties for APEX engine
APEX event receiver and sender plugins sometimes need to exchange information with tasks, especially in the case of REST communication. This change enables passthrough of Properties from the event carrier technology plugins to APEX task, task selection, and state finalizer logics. Apologies for the size of the review but this change involves passinng the properties through all the APEX components, hence the large number of small changes. Issue-ID: POLICY-1742 Change-Id: I219fd69550f06702ef64adbb165fe7baac422e96 Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'services/services-engine/src/main/java/org/onap')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java136
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java6
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java17
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java14
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java20
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java52
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java18
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java50
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java3
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java67
10 files changed, 139 insertions, 244 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java
index 368305efa..06b9ddf53 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java
@@ -5,15 +5,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -23,8 +23,13 @@ package org.onap.policy.apex.service.engine.event;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
import org.onap.policy.apex.model.basicmodel.concepts.AxReferenceKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +41,9 @@ import org.slf4j.LoggerFactory;
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
+@Getter
+@ToString
+@EqualsAndHashCode(callSuper = false)
public class ApexEvent extends HashMap<String, Object> implements Serializable {
private static final long serialVersionUID = -4451918242101961685L;
@@ -104,19 +112,20 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
/** The target of an Apex event must match this regular expression. */
public static final String TARGET_REGEXP = SOURCE_REGEXP;
- // The fields of the event
- // @formatter:off
+ // The standard fields of the event
private final String name;
private final String version;
private final String nameSpace;
private final String source;
private final String target;
- // @formatter:on
// An identifier for the current event execution. The default value here will always be unique
// in a single JVM
private long executionId = ApexEvent.getNextExecutionId();
+ // Event related properties used during processing of this event
+ private Properties executionProperties;
+
// A string holding a message that indicates why processing of this event threw an exception
private String exceptionMessage;
@@ -144,7 +153,7 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
/**
* Private utility to get the next candidate value for a Execution ID. This value will always be
* unique in a single JVM
- *
+ *
* @return the next candidate value for a Execution ID
*/
private static synchronized long getNextExecutionId() {
@@ -190,63 +199,10 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
}
/**
- * Gets the name.
+ * Sets the pass-thru executionID for this event.
*
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * Gets the version.
- *
- * @return the version
- */
- public String getVersion() {
- return version;
- }
-
- /**
- * Gets the name space.
- *
- * @return the name space
- */
- public String getNameSpace() {
- return nameSpace;
- }
-
- /**
- * Gets the source.
- *
- * @return the source
- */
- public String getSource() {
- return source;
- }
-
- /**
- * Gets the target.
- *
- * @return the target
- */
- public String getTarget() {
- return target;
- }
-
- /**
- * Gets the pass-thru executionID for this event.
- *
- * @return the executionID
- */
- public long getExecutionId() {
- return executionId;
- }
-
- /**
- * Sets the pass-thru executionID for this event. The default value for executionID will be be
- * unique in the current JVM. For some applications/deployments this executionID may need to
- * globally unique
+ * <p>The default value for executionID is unique in the current JVM. For some applications/deployments this
+ * executionID may need to be globally unique
*
* @param executionId the executionID
*/
@@ -255,12 +211,12 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
}
/**
- * Gets the exception message explaining why processing of this event to fail.
+ * Set the execution properties for this event.
*
- * @return the exception message
+ * @param executionProperties the execution properties to set
*/
- public String getExceptionMessage() {
- return exceptionMessage;
+ public void setExecutionProperties(Properties executionProperties) {
+ this.executionProperties = executionProperties;
}
/**
@@ -318,50 +274,4 @@ public class ApexEvent extends HashMap<String, Object> implements Serializable {
// Go ahead and put everything
super.putAll(incomingMap);
}
-
- /*
- * Object overrides from here
- */
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#toString()
- */
- @Override
- public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("name=");
- builder.append(name);
- builder.append(",version=");
- builder.append(version);
- builder.append(",nameSpace=");
- builder.append(nameSpace);
- builder.append(",source=");
- builder.append(source);
- builder.append(",target=");
- builder.append(target);
- builder.append(",executionID=");
- builder.append(executionId);
- builder.append(",exceptionMessage=");
- builder.append(exceptionMessage);
- builder.append(",");
- builder.append("[");
-
- boolean firstData = true;
- for (final Map.Entry<String, Object> dataEntry : this.entrySet()) {
- if (firstData) {
- firstData = false;
- } else {
- builder.append(',');
- }
-
- builder.append(dataEntry.getKey());
- builder.append('=');
- builder.append(dataEntry.getValue());
- }
-
- builder.append("]");
- return builder.toString();
- }
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java
index 11f005ddf..a45ba2e1e 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java
@@ -5,15 +5,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java
index 414fbc9e3..1d6476837 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java
@@ -5,21 +5,23 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
package org.onap.policy.apex.service.engine.event;
+import java.util.Properties;
+
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
@@ -44,7 +46,7 @@ public interface ApexEventProducer {
/**
* Get the peered reference object for this producer.
- *
+ *
* @param peeredMode the peered mode for which to return the reference
* @return the peered reference object for this producer
*/
@@ -52,7 +54,7 @@ public interface ApexEventProducer {
/**
* Set the peered reference object for this producer.
- *
+ *
* @param peeredMode the peered mode for which to return the reference
* @param peeredReference the peered reference object for this producer
*/
@@ -62,14 +64,15 @@ public interface ApexEventProducer {
* Send an event to the producer.
*
* @param executionId the unique ID that produced this event
+ * @param executionProperties properties used during processing of this event
* @param eventName The name of the event
* @param event The converted event as an object
*/
- void sendEvent(long executionId, String eventName, Object event);
+ void sendEvent(long executionId, Properties executionProperties, String eventName, Object event);
/**
* Get the name of this event producer.
- *
+ *
* @return the event producer name
*/
String getName();
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java
index 8d7e7bae5..aed031eba 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java
@@ -5,21 +5,23 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
package org.onap.policy.apex.service.engine.event;
+import java.util.Properties;
+
/**
* This interface is used by an Apex event consumer {@link ApexEventConsumer} consumer to pass a
* received event to Apex.
@@ -31,16 +33,18 @@ public interface ApexEventReceiver {
* Receive an event from a consumer for processing.
*
* @param executionId the unique ID for execution of this event
+ * @param executionProperties properties used during processing of this event
* @param event the event to receive
* @throws ApexEventException on exceptions receiving an event into Apex
*/
- void receiveEvent(long executionId, Object event) throws ApexEventException;
+ void receiveEvent(long executionId, Properties executionProperties, Object event) throws ApexEventException;
/**
* Receive an event from a consumer for processing.
*
+ * @param executionProperties properties used during processing of this event
* @param event the event to receive
* @throws ApexEventException on exceptions receiving an event into Apex
*/
- void receiveEvent(Object event) throws ApexEventException;
+ void receiveEvent(Properties executionProperties, Object event) throws ApexEventException;
}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java
index f74751a63..8d845e0ce 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java
@@ -5,15 +5,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -99,7 +99,7 @@ public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
/**
* Receive an incoming event send request from the peered event Requestor producer and queue it.
- *
+ *
* @param eventObject the incoming event to process
* @throws ApexEventRuntimeException on queueing errors
*/
@@ -117,7 +117,7 @@ public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
/*
* (non-Javadoc)
- *
+ *
* @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start()
*/
@Override
@@ -131,7 +131,7 @@ public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
/*
* (non-Javadoc)
- *
+ *
* @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName()
*/
@Override
@@ -141,7 +141,7 @@ public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
/**
* Get the number of events received to date.
- *
+ *
* @return the number of events received
*/
public int getEventsReceived() {
@@ -150,7 +150,7 @@ public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
/*
* (non-Javadoc)
- *
+ *
* @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.
* policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode)
*/
@@ -161,7 +161,7 @@ public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
/*
* (non-Javadoc)
- *
+ *
* @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.
* policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode,
* org.onap.policy.apex.service.engine.event.PeeredReference)
@@ -190,7 +190,7 @@ public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
}
// Send the event into Apex
- eventReceiver.receiveEvent(eventObject);
+ eventReceiver.receiveEvent(null, eventObject);
eventsReceived++;
} catch (final InterruptedException e) {
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java
index fda48e7ca..9018b91ef 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java
@@ -5,15 +5,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -22,6 +22,7 @@ package org.onap.policy.apex.service.engine.event.impl.eventrequestor;
import java.util.EnumMap;
import java.util.Map;
+import java.util.Properties;
import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
import org.onap.policy.apex.service.engine.event.ApexEventException;
@@ -35,11 +36,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Concrete implementation of an Apex event producer that sends one or more events to its peered
- * event requestor consumer.
- *
+ * Concrete implementation of an Apex event producer that sends one or more events to its peered event requestor
+ * consumer.
+ *
* @author Liam Fallon (liam.fallon@ericsson.com)
- *
+ *
*/
public class EventRequestorProducer implements ApexEventProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorProducer.class);
@@ -56,7 +57,7 @@ public class EventRequestorProducer implements ApexEventProducer {
/*
* (non-Javadoc)
- *
+ *
* @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String,
* org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters)
*/
@@ -85,7 +86,7 @@ public class EventRequestorProducer implements ApexEventProducer {
/*
* (non-Javadoc)
- *
+ *
* @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
*/
@Override
@@ -95,44 +96,35 @@ public class EventRequestorProducer implements ApexEventProducer {
/**
* Get the number of events sent to date.
- *
+ *
* @return the number of events received
*/
public int getEventsSent() {
return eventsSent;
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.
- * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode)
+ /**
+ * {@inheritDoc}
*/
@Override
public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
return peerReferenceMap.get(peeredMode);
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.
- * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode,
- * org.onap.policy.apex.service.engine.event.PeeredReference)
+ /**
+ * {@inheritDoc}
*/
@Override
public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
peerReferenceMap.put(peeredMode, peeredReference);
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.
- * String, java.lang.Object)
+ /**
+ * {@inheritDoc}
*/
@Override
- public void sendEvent(final long executionId, final String eventName, final Object eventObject) {
+ public void sendEvent(final long executionId, final Properties executorProperties, final String eventName,
+ final Object eventObject) {
// Check if this is a synchronized event, if so we have received a reply
final SynchronousEventCache synchronousEventCache =
(SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
@@ -166,10 +158,8 @@ public class EventRequestorProducer implements ApexEventProducer {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#stop()
+ /**
+ * {@inheritDoc}
*/
@Override
public void stop() {
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java
index 0f0996fb8..076c031e9 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java
@@ -5,15 +5,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -80,7 +80,7 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
/**
* Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single
* JVM
- *
+ *
* @return the next candidate value for a Execution ID
*/
private static synchronized long getNextExecutionId() {
@@ -142,7 +142,7 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
/*
* (non-Javadoc)
- *
+ *
* @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName()
*/
@Override
@@ -152,7 +152,7 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
/*
* (non-Javadoc)
- *
+ *
* @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.
* policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode)
*/
@@ -163,7 +163,7 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
/*
* (non-Javadoc)
- *
+ *
* @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.
* policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode,
* org.onap.policy.apex.service.engine.event.PeeredReference)
@@ -175,7 +175,7 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
/*
* (non-Javadoc)
- *
+ *
* @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start()
*/
@Override
@@ -211,7 +211,7 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
// Process the event from the text block if there is one there
if (textBlock.getText() != null) {
- eventReceiver.receiveEvent(getNextExecutionId(), textBlock.getText());
+ eventReceiver.receiveEvent(getNextExecutionId(), null, textBlock.getText());
}
}
while (!textBlock.isEndOfText());
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java
index e12b772df..db8413c6f 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java
@@ -5,15 +5,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.EnumMap;
import java.util.Map;
+import java.util.Properties;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.onap.policy.apex.service.engine.event.ApexEventException;
@@ -57,10 +58,8 @@ public class ApexFileEventProducer implements ApexEventProducer {
private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
new EnumMap<>(EventHandlerPeeredMode.class);
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#init()
+ /**
+ * {@inheritDoc}
*/
@Override
public void init(final String name, final EventHandlerParameters producerParameters) throws ApexEventException {
@@ -105,47 +104,36 @@ public class ApexFileEventProducer implements ApexEventProducer {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
+ /**
+ * {@inheritDoc}
*/
@Override
public String getName() {
return producerName;
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.
- * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode)
+ /**
+ * {@inheritDoc}
*/
@Override
public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
return peerReferenceMap.get(peeredMode);
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.
- * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode,
- * org.onap.policy.apex.service.engine.event.PeeredReference)
+ /**
+ * {@inheritDoc}
*/
@Override
public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
peerReferenceMap.put(peeredMode, peeredReference);
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long,
- * java.lang.String, java.lang.Object)
+ /**
+ * {@inheritDoc}
*/
@Override
- public void sendEvent(final long executionId, final String eventName, final Object event) {
+ public void sendEvent(final long executionId, final Properties executionProperties, final String eventName,
+ final Object event) {
// Check if this is a synchronized event, if so we have received a reply
final SynchronousEventCache synchronousEventCache =
(SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
@@ -169,10 +157,8 @@ public class ApexFileEventProducer implements ApexEventProducer {
eventOutputStream.flush();
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
+ /**
+ * {@inheritDoc}
*/
@Override
public void stop() {
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java
index cb4086d35..7c0c17d6b 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventMarshaller.java
@@ -186,7 +186,8 @@ public class ApexEventMarshaller implements ApexEventListener, Runnable {
// Process the next Apex event from the queue
final Object event = converter.fromApexEvent(apexEvent);
- producer.sendEvent(apexEvent.getExecutionId(), apexEvent.getName(), event);
+ producer.sendEvent(apexEvent.getExecutionId(), apexEvent.getExecutionProperties(), apexEvent.getName(),
+ event);
if (LOGGER.isTraceEnabled()) {
final String message = "event sent : " + apexEvent.toString();
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
index 97aa25fbe..6fdc0fdef 100644
--- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/main/ApexEventUnmarshaller.java
@@ -5,15 +5,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
+ *
* SPDX-License-Identifier: Apache-2.0
* ============LICENSE_END=========================================================
*/
@@ -21,6 +21,7 @@
package org.onap.policy.apex.service.engine.main;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -88,7 +89,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
* @param consumerParameters the consumer parameters for this specific unmarshaler
*/
public ApexEventUnmarshaller(final String name, final EngineServiceParameters engineServiceParameters,
- final EventHandlerParameters consumerParameters) {
+ final EventHandlerParameters consumerParameters) {
this.name = name;
this.engineServiceParameters = engineServiceParameters;
this.consumerParameters = consumerParameters;
@@ -118,8 +119,8 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
consumer.start();
// Configure and start the event reception thread
- final String threadName = engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName()
- + ":" + name;
+ final String threadName =
+ engineServiceParameters.getEngineKey().getName() + ":" + this.getClass().getName() + ":" + name;
unmarshallerThread = new ApplicationThreadFactory(threadName).newThread(this);
unmarshallerThread.setDaemon(true);
unmarshallerThread.start();
@@ -154,7 +155,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
/**
* Connect a synchronous unmarshaler with a synchronous marshaler.
- *
+ *
* @param peeredMode the peered mode under which the unmarshaler and marshaler are connected
* @param peeredMarshaller the synchronous marshaler to connect with
*/
@@ -164,7 +165,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
// To connect a synchronous unmarshaler and marshaler, we create a synchronous event
// cache on the consumer/producer pair
new SynchronousEventCache(peeredMode, consumer, peeredMarshaller.getProducer(),
- consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
+ consumerParameters.getPeerTimeout(EventHandlerPeeredMode.SYNCHRONOUS));
return;
case REQUESTOR:
@@ -176,36 +177,34 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.service.engine.event.ApexEventReceiver#receiveEvent(java.lang.Object)
+ /**
+ * {@inheritDoc}
*/
@Override
- public void receiveEvent(final Object event) throws ApexEventException {
- receiveEvent(0, event, true);
+ public void receiveEvent(final Properties executionProperties, final Object event) throws ApexEventException {
+ receiveEvent(0, executionProperties, event, true);
}
- /*
- * (non-Javadoc)
- *
- * @see org.onap.policy.apex.service.engine.event.ApexEventReceiver#receiveEvent(long, java.lang.Object)
+ /**
+ * {@inheritDoc}
*/
@Override
- public void receiveEvent(final long executionId, final Object event) throws ApexEventException {
- receiveEvent(executionId, event, false);
+ public void receiveEvent(final long executionId, final Properties executionProperties, final Object event)
+ throws ApexEventException {
+ receiveEvent(executionId, executionProperties, event, false);
}
/**
* Receive an event from a consumer, convert its protocol and forward it to Apex.
*
* @param executionId the execution id the incoming execution ID
+ * @param executionProperties properties used during processing of this event
* @param event the event in its native format
* @param generateExecutionId if true, let Apex generate the execution ID, if false, use the incoming execution ID
* @throws ApexEventException on unmarshaling errors on events
*/
- private void receiveEvent(final long executionId, final Object event, final boolean generateExecutionId)
- throws ApexEventException {
+ private void receiveEvent(final long executionId, final Properties executionProperties, final Object event,
+ final boolean generateExecutionId) throws ApexEventException {
// Push the event onto the queue
if (LOGGER.isTraceEnabled()) {
String eventString = "onMessage(): event received: " + event.toString();
@@ -215,6 +214,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
// Convert the incoming events to Apex events
try {
final List<ApexEvent> apexEventList = converter.toApexEvent(consumerParameters.getEventName(), event);
+
for (final ApexEvent apexEvent : apexEventList) {
isEventFilteredOut(apexEvent);
@@ -228,19 +228,21 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
apexEvent.setExecutionId(executionId);
}
+ apexEvent.setExecutionProperties(executionProperties);
+
// Enqueue the event
queue.add(apexEvent);
// Cache synchronized events that are sent
if (consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)) {
- final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) consumer
- .getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
+ final SynchronousEventCache synchronousEventCache =
+ (SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS);
synchronousEventCache.cacheSynchronizedEventToApex(apexEvent.getExecutionId(), apexEvent);
}
}
} catch (final ApexException e) {
final String errorMessage = "Error while converting event into an ApexEvent for " + name + ": "
- + e.getMessage() + ", Event=" + event;
+ + e.getMessage() + ", Event=" + event;
LOGGER.warn(errorMessage, e);
throw new ApexEventException(errorMessage, e);
}
@@ -248,23 +250,22 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
/**
* Check if an event is filtered out and ignored.
- *
+ *
* @param apexEvent the event to check
*/
private boolean isEventFilteredOut(final ApexEvent apexEvent) {
// Check if we are filtering events on this unmarshaler, if so check the event name
// against the filter
if (consumerParameters.isSetEventNameFilter()
- && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
-
+ && !apexEvent.getName().matches(consumerParameters.getEventNameFilter())) {
+
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("onMessage(): event {} not processed, filtered out by filter", apexEvent,
- consumerParameters.getEventNameFilter());
+ consumerParameters.getEventNameFilter());
}
-
+
return true;
- }
- else {
+ } else {
return false;
}
}
@@ -325,7 +326,7 @@ public class ApexEventUnmarshaller implements ApexEventReceiver, Runnable {
// Order a stop on the synchronous cache if one exists
if (consumerParameters != null && consumerParameters.isPeeredMode(EventHandlerPeeredMode.SYNCHRONOUS)
- && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
+ && consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS) != null) {
((SynchronousEventCache) consumer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)).stop();
}