summaryrefslogtreecommitdiffstats
path: root/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event
diff options
context:
space:
mode:
Diffstat (limited to 'services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event')
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java342
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConsumer.java80
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java55
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventException.java51
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventList.java32
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java81
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProtocolConverter.java39
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java46
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventRuntimeException.java51
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java176
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/PeeredReference.java70
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java294
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java83
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java82
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProtocolFactory.java76
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/Apex2ApexEventConverter.java140
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/ApexEventProtocolParameters.java58
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/package-info.java27
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java143
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/package-info.java28
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorCarrierTechnologyParameters.java67
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java218
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java178
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/package-info.java26
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java208
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java247
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java141
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java167
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlock.java78
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReader.java46
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReaderFactory.java80
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/package-info.java27
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/package-info.java27
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java181
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/package-info.java27
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java433
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java135
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/package-info.java27
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/package-info.java30
-rw-r--r--services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/package-info.java32
40 files changed, 4329 insertions, 0 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java
new file mode 100644
index 000000000..552f949a2
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEvent.java
@@ -0,0 +1,342 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class ApexEvent is an event class that external systems use to send events to and receive events from Apex engines. The event itself is a hash map of
+ * string keys and object values, used to pass data.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ApexEvent extends HashMap<String, Object> implements Serializable {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEvent.class);
+
+ private static final long serialVersionUID = -4451918242101961685L;
+
+ // Holds the next identifier for event execution.
+ private static AtomicLong nextExecutionID = new AtomicLong(0L);
+
+ /** The name of the Apex event, a mandatory field. All Apex events must have a name so that the event can be looked up in the Apex policy model. */
+ public static final String NAME_HEADER_FIELD = "name";
+
+ /**
+ * The version of the Apex event, an optional field. If a version is specified on an Apex event, the definition of that version of the event is taken from
+ * the Apex policy model. If no version is specified, the latest version of the event is used.
+ */
+ public static final String VERSION_HEADER_FIELD = "version";
+
+ /**
+ * The name space of the Apex event, an optional field. If a name space is specified on an Apex event it must match the name space on the event definition
+ * taken from the Apex policy model. If no name space is specified, the name space from the event definition in the Apex policy model is used.
+ */
+ public static final String NAMESPACE_HEADER_FIELD = "nameSpace";
+
+ /**
+ * The source of the Apex event, an optional field. It specifies where the Apex event has come from and its use is reserved for now. If no source is
+ * specified, the source from the event definition in the Apex policy model is used.
+ */
+ public static final String SOURCE_HEADER_FIELD = "source";
+
+ /**
+ * The target of the Apex event, an optional field. It specifies where the Apex event is going to and its use is reserved for now. If no target is
+ * specified, the target from the event definition in the Apex policy model is used.
+ */
+ public static final String TARGET_HEADER_FIELD = "target";
+
+ /**
+ * The exception message field of an Apex event is an exception message indicating that an event failed.
+ */
+ public static final String EXCEPTION_MESSAGE_HEADER_FIELD = "exceptionMessage";
+
+ /** The name of an Apex event must match this regular expression. */
+ public static final String NAME_REGEXP = "[A-Za-z0-9\\-_.]+";
+
+ /** The version of an Apex event must match this regular expression. */
+ public static final String VERSION_REGEXP = "[A-Za-z0-9.]+";
+
+ /** The name space of an Apex event must match this regular expression. */
+ public static final String NAMESPACE_REGEXP = "([a-zA_Z_][\\.\\w]*)";
+
+ /** The source of an Apex event must match this regular expression. */
+ public static final String SOURCE_REGEXP = "^$|[A-Za-z0-9\\.\\-_:]+";
+
+ /** The target of an Apex event must match this regular expression. */
+ public static final String TARGET_REGEXP = "^$|[A-Za-z0-9\\.\\-_:]+";
+
+ // The fields of the event
+ // @formatter:off
+ private final String name;
+ private final String version;
+ private final String nameSpace;
+ private final String source;
+ private final String target;
+ // @formatter:on
+
+ // An identifier for the current event execution. The default value here will always be unique in a single JVM
+ private long executionID = ApexEvent.getNextExecutionID();
+
+ // A string holding a message that indicates why processing of this event threw an exception
+ private String exceptionMessage;
+
+ /**
+ * Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single JVM
+ *
+ * @return the next candidate value for a Execution ID
+ */
+ private static synchronized long getNextExecutionID() {
+ return nextExecutionID.getAndIncrement();
+ }
+
+ /**
+ * Instantiates a new apex event.
+ *
+ * @param name the name of the event
+ * @param version the version of the event
+ * @param nameSpace the name space (java package) of the event
+ * @param source the source of the event
+ * @param target the target of the event
+ * @throws ApexEventException thrown on validation errors on event names and versions
+ */
+ public ApexEvent(final String name, final String version, final String nameSpace, final String source, final String target) throws ApexEventException {
+ // @formatter:off
+ this.name = validateField("name", name, NAME_REGEXP);
+ this.version = validateField("version", version, VERSION_REGEXP);
+ this.nameSpace = validateField("nameSpace", nameSpace, NAMESPACE_REGEXP);
+ this.source = validateField("source", source, SOURCE_REGEXP);
+ this.target = validateField("target", target, TARGET_REGEXP);
+ // @formatter:on
+ }
+
+ /**
+ * Check that a field of the event is valid.
+ *
+ * @param fieldName the name of the field to check
+ * @param fieldValue the value of the field to check
+ * @param fieldRegexp the regular expression to check the field against
+ * @return the validated field value
+ * @throws ApexEventException thrown if the field is invalid
+ */
+ private String validateField(final String fieldName, final String fieldValue, final String fieldRegexp) throws ApexEventException {
+ if (fieldValue.matches(fieldRegexp)) {
+ return fieldValue;
+ }
+ else {
+ LOGGER.warn("event \"" + name + ": field \"" + fieldName + "=" + fieldValue + "\" is illegal. It doesn't match regex '" + fieldRegexp + "'");
+ throw new ApexEventException("event \"" + name + ": field \"" + fieldName + "=" + fieldValue + "\" is illegal");
+ }
+ }
+
+ /**
+ * Check that the key of an event is valid.
+ *
+ * @param key the key
+ * @return the string
+ * @throws ApexEventException the apex event exception
+ */
+ private String validKey(final String key) throws ApexEventException {
+ if (key.matches(NAME_REGEXP)) {
+ return key;
+ }
+ else {
+ LOGGER.warn("event \"" + name + ": key \"" + key + "\" is illegal");
+ throw new ApexEventException("event \"" + name + ": key \"" + key + "\" is illegal");
+ }
+ }
+
+ /**
+ * Gets the name.
+ *
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Gets the version.
+ *
+ * @return the version
+ */
+ public String getVersion() {
+ return version;
+ }
+
+ /**
+ * Gets the name space.
+ *
+ * @return the name space
+ */
+ public String getNameSpace() {
+ return nameSpace;
+ }
+
+ /**
+ * Gets the source.
+ *
+ * @return the source
+ */
+ public String getSource() {
+ return source;
+ }
+
+ /**
+ * Gets the target.
+ *
+ * @return the target
+ */
+ public String getTarget() {
+ return target;
+ }
+
+ /**
+ * Gets the pass-thru executionID for this event.
+ *
+ * @return the executionID
+ */
+ public long getExecutionID() {
+ return executionID;
+ }
+
+ /**
+ * Sets the pass-thru executionID for this event. The default value for executionID will be be unique in the current JVM. For some applications/deployments
+ * this executionID may need to globally unique
+ *
+ * @param executionID the executionID
+ */
+ public void setExecutionID(final long executionID) {
+ this.executionID = executionID;
+ }
+
+ /**
+ * Gets the exception message explaining why processing of this event to fail.
+ *
+ * @return the exception message
+ */
+ public String getExceptionMessage() {
+ return exceptionMessage;
+ }
+
+ /**
+ * Sets the exception message explaining why processing of this event to fail.
+ *
+ * @param exceptionMessage the exception message
+ */
+ public void setExceptionMessage(final String exceptionMessage) {
+ this.exceptionMessage = exceptionMessage;
+ }
+
+ /*
+ * Map overrides from here
+ */
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Map#put(java.lang.Object, java.lang.Object)
+ */
+ @Override
+ public Object put(final String key, final Object value) {
+ // Check if the key is valid
+ try {
+ return super.put(validKey(key), value);
+ }
+ catch (final ApexEventException e) {
+ return null;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Map#putAll(java.util.Map)
+ */
+ @Override
+ public void putAll(final Map<? extends String, ? extends Object> incomingMap) {
+ // Check the keys are valid
+ try {
+ for (final String key : incomingMap.keySet()) {
+ validKey(key);
+ }
+ }
+ catch (final ApexEventException e) {
+ // One of the keys is invalid
+ return;
+ }
+
+ // Go ahead and put everything
+ super.putAll(incomingMap);
+ }
+
+ /*
+ * Object overrides from here
+ */
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("name=");
+ builder.append(name);
+ builder.append(",version=");
+ builder.append(version);
+ builder.append(",nameSpace=");
+ builder.append(nameSpace);
+ builder.append(",source=");
+ builder.append(source);
+ builder.append(",target=");
+ builder.append(target);
+ builder.append(",executionID=");
+ builder.append(executionID);
+ builder.append(",exceptionMessage=");
+ builder.append(exceptionMessage);
+ builder.append(",");
+ builder.append("[");
+
+ boolean firstData = true;
+ for (final Map.Entry<String, Object> dataEntry : this.entrySet()) {
+ if (firstData) {
+ firstData = false;
+ }
+ else {
+ builder.append(',');
+ }
+
+ builder.append(dataEntry.getKey());
+ builder.append('=');
+ builder.append(dataEntry.getValue());
+ }
+
+ builder.append("]");
+ return builder.toString();
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConsumer.java
new file mode 100644
index 000000000..53f11dd61
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConsumer.java
@@ -0,0 +1,80 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+
+/**
+ * This interface is used by technology specific consumers and listeners that are are listening for
+ * or collecting events for input into Apex. Users specify the consumer technology to use in the
+ * Apex configuration and Apex uses a factory to start the appropriate consumer plugin that
+ * implements this interface for its input. The technology specific implementation details are
+ * hidden behind this interface.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public interface ApexEventConsumer {
+ /**
+ * Initialize the consumer.
+ *
+ * @param name a name for this consumer
+ * @param consumerParameters the parameters to initialize this consumer
+ * @param apexEventReceiver the apex event receiver that should be used to pass events received
+ * by the consumer into Apex
+ * @throws ApexEventException container exception on errors initializing event handling
+ */
+ void init(String name, EventHandlerParameters consumerParameters, ApexEventReceiver apexEventReceiver)
+ throws ApexEventException;
+
+ /**
+ * Start the consumer, start input of events into Apex.
+ */
+ void start();
+
+ /**
+ * Get the peered reference object for this consumer.
+ *
+ * @param peeredMode the peered mode for which to return the reference
+ * @return the peered reference object for this consumer
+ */
+ PeeredReference getPeeredReference(EventHandlerPeeredMode peeredMode);
+
+ /**
+ * Set the peered reference object for this consumer.
+ *
+ * @param peeredMode the peered mode for which to return the reference
+ * @param peeredReference the peered reference object for this consumer
+ */
+ void setPeeredReference(EventHandlerPeeredMode peeredMode, PeeredReference peeredReference);
+
+ /**
+ * Get the name of this event consumer.
+ *
+ * @return the event consumer name
+ */
+ String getName();
+
+ /**
+ * Stop the event consumer.
+ */
+ void stop();
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java
new file mode 100644
index 000000000..11f005ddf
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventConverter.java
@@ -0,0 +1,55 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import java.util.List;
+
+import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+
+/**
+ * The Interface ApexEventConverter is used for applications that want to convert arbitrary event
+ * types to and from Apex events. Application implement this interface to convert their events to
+ * and from Apex events.The Apex service can then use this interface to transparently transfer
+ * events into and out of an Apex system.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public interface ApexEventConverter {
+
+ /**
+ * Convert an event of arbitrary type into an Apex event.
+ *
+ * @param name the name of the incoming event
+ * @param eventOfOtherType the event of some other type to convert
+ * @return the apex event
+ * @throws ApexException thrown on conversion errors
+ */
+ List<ApexEvent> toApexEvent(String name, Object eventOfOtherType) throws ApexException;
+
+ /**
+ * Convert an Apex event into an event of arbitrary type {@code OTHER_EVENT_TYPE}.
+ *
+ * @param apexEvent the apex event to convert
+ * @return the event converted into the other type
+ * @throws ApexException thrown on conversion errors
+ */
+ Object fromApexEvent(ApexEvent apexEvent) throws ApexException;
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventException.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventException.java
new file mode 100644
index 000000000..24f57d741
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventException.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+
+/**
+ * This class will be called if an error occurs in handling Apex events.
+ *
+ * @author eeilfn
+ */
+public class ApexEventException extends ApexException {
+ private static final long serialVersionUID = -4245694568321686450L;
+
+ /**
+ * Instantiates a new apex event exception.
+ *
+ * @param message the message
+ */
+ public ApexEventException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Instantiates a new apex event exception.
+ *
+ * @param message the message
+ * @param e the e
+ */
+ public ApexEventException(final String message, final Exception e) {
+ super(message, e);
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventList.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventList.java
new file mode 100644
index 000000000..9fe03ef47
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventList.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import java.util.ArrayList;
+
+/**
+ * The Class ApexEventList holds a list of APEX events.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ApexEventList extends ArrayList<ApexEvent> {
+ private static final long serialVersionUID = -8496211897512202896L;
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java
new file mode 100644
index 000000000..414fbc9e3
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProducer.java
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+
+/**
+ * This interface is used by technology specific producers and publishers that are handling events
+ * output by Apex. Users specify the producer technology to use in the Apex configuration and Apex
+ * uses a factory to start the appropriate producer plugin that implements this interface for its
+ * output. The technology specific implementation details are hidden behind this interface.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public interface ApexEventProducer {
+
+ /**
+ * Initialize the producer.
+ *
+ * @param name a name for this producer
+ * @param producerParameters the parameters to initialise this producer
+ * @throws ApexEventException exception on errors initializing an event producer
+ */
+ void init(String name, EventHandlerParameters producerParameters) throws ApexEventException;
+
+ /**
+ * Get the peered reference object for this producer.
+ *
+ * @param peeredMode the peered mode for which to return the reference
+ * @return the peered reference object for this producer
+ */
+ PeeredReference getPeeredReference(EventHandlerPeeredMode peeredMode);
+
+ /**
+ * Set the peered reference object for this producer.
+ *
+ * @param peeredMode the peered mode for which to return the reference
+ * @param peeredReference the peered reference object for this producer
+ */
+ void setPeeredReference(EventHandlerPeeredMode peeredMode, PeeredReference peeredReference);
+
+ /**
+ * Send an event to the producer.
+ *
+ * @param executionId the unique ID that produced this event
+ * @param eventName The name of the event
+ * @param event The converted event as an object
+ */
+ void sendEvent(long executionId, String eventName, Object event);
+
+ /**
+ * Get the name of this event producer.
+ *
+ * @return the event producer name
+ */
+ String getName();
+
+ /**
+ * Stop the event producer.
+ */
+ void stop();
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProtocolConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProtocolConverter.java
new file mode 100644
index 000000000..ec19e65c3
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventProtocolConverter.java
@@ -0,0 +1,39 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters;
+
+/**
+ * The Interface ApexEventProtocolConverter extends ApexEventConverter to allow
+ * EventProtocolParameters conversion parameters to be passed to the converter.
+ *
+ * @author John Keeney (john.keeney@ericsson.com)
+ */
+public interface ApexEventProtocolConverter extends ApexEventConverter {
+
+ /**
+ * Initialise the converter instance with the parameters for the EventProtocol.
+ *
+ * @param parameters the parameters for the EventProtocol
+ */
+ void init(EventProtocolParameters parameters);
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java
new file mode 100644
index 000000000..8d7e7bae5
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventReceiver.java
@@ -0,0 +1,46 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+/**
+ * This interface is used by an Apex event consumer {@link ApexEventConsumer} consumer to pass a
+ * received event to Apex.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public interface ApexEventReceiver {
+ /**
+ * Receive an event from a consumer for processing.
+ *
+ * @param executionId the unique ID for execution of this event
+ * @param event the event to receive
+ * @throws ApexEventException on exceptions receiving an event into Apex
+ */
+ void receiveEvent(long executionId, Object event) throws ApexEventException;
+
+ /**
+ * Receive an event from a consumer for processing.
+ *
+ * @param event the event to receive
+ * @throws ApexEventException on exceptions receiving an event into Apex
+ */
+ void receiveEvent(Object event) throws ApexEventException;
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventRuntimeException.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventRuntimeException.java
new file mode 100644
index 000000000..1a624face
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexEventRuntimeException.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException;
+
+/**
+ * This exception will be called if a runtime error occurs in Apex event handling.
+ *
+ * @author Liam Fallon
+ */
+public class ApexEventRuntimeException extends ApexRuntimeException {
+ private static final long serialVersionUID = -8507246953751956974L;
+
+ /**
+ * Instantiates a new apex runtime event exception with a message.
+ *
+ * @param message the message
+ */
+ public ApexEventRuntimeException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Instantiates a new apex runtime event exception with a message and a caused by exception.
+ *
+ * @param message the message
+ * @param e the exception that caused this exception to be thrown
+ */
+ public ApexEventRuntimeException(final String message, final Exception e) {
+ super(message, e);
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java
new file mode 100644
index 000000000..62663b9f1
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/ApexPeriodicEventGenerator.java
@@ -0,0 +1,176 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This class is used to generate periodic events into an Apex engine service. It is used to trigger
+ * policies that perform housekeeping operations.
+ *
+ * @author eeilfn
+ */
+public class ApexPeriodicEventGenerator extends TimerTask {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexPeriodicEventGenerator.class);
+
+ /** The name of the periodic event. */
+ public static final String PERIODIC_EVENT_NAME = "PERIODIC_EVENT";
+
+ /** The version of the periodic event. */
+ public static final String PERIODIC_EVENT_VERSION = "0.0.1";
+
+ /** The name space of the periodic event. */
+ public static final String PERIODIC_EVENT_NAMESPACE = "com.ericsson.apex.service.engine.event";
+
+ /** The source of the periodic event. */
+ public static final String PERIODIC_EVENT_SOURCE = "internal";
+
+ /** The target of the periodic event. */
+ public static final String PERIODIC_EVENT_TARGET = "internal";
+
+ /**
+ * The field name in the periodic event for the delay between occurrences of the periodic event.
+ */
+ public static final String PERIODIC_DELAY = "PERIODIC_DELAY";
+
+ /**
+ * The field name in the periodic event for the time at which the first periodic event will
+ * occur.
+ */
+ public static final String PERIODIC_FIRST_TIME = "PERIODIC_FIRST_TIME";
+
+ /**
+ * The field name in the periodic event for the time at which the last periodic event will
+ * occur.
+ */
+ public static final String PERIODIC_LAST_TIME = "PERIODIC_LAST_TIME";
+
+ /** The field name in the periodic event for the time at which the event was sent. */
+ public static final String PERIODIC_CURRENT_TIME = "PERIODIC_CURRENT_TIME";
+
+ /**
+ * The field name in the periodic event for the number of occurrences of this event that have
+ * been sent to date, this is a sequence number for the periodic event.
+ */
+ public static final String PERIODIC_EVENT_COUNT = "PERIODIC_EVENT_COUNT";
+
+ // The Java timer used to send periodic events
+ private Timer timer = null;
+
+ // The engine service interface we'll send periodic events to
+ private final EngineServiceEventInterface engineServiceEventInterface;
+
+ // Timing information
+ private long period = 0;
+ private long firstEventTime = 0;
+ private long lastEventTime = 0;
+ private long eventCount = 0;
+
+ /**
+ * Constructor, save a reference to the event stream handler.
+ *
+ * @param engineServiceEventInterface the engine service event interface on which to send
+ * periodic events
+ * @param period The period in milliseconds between events
+ */
+ public ApexPeriodicEventGenerator(final EngineServiceEventInterface engineServiceEventInterface,
+ final long period) {
+ // Save the engine service reference and delay
+ this.engineServiceEventInterface = engineServiceEventInterface;
+ this.period = period;
+
+ timer = new Timer(ApexPeriodicEventGenerator.class.getSimpleName(), true);
+ timer.schedule(this, period, period);
+ }
+
+ /**
+ * Output the metrics for stream loading.
+ */
+ @Override
+ public void run() {
+ final Map<String, Object> periodicEventMap = new HashMap<>();
+
+ // Record the current event time
+ final long currentEventTime = System.currentTimeMillis();
+
+ // Check if this is the first periodic event
+ if (firstEventTime == 0) {
+ firstEventTime = currentEventTime;
+ lastEventTime = currentEventTime;
+ }
+
+ // Increment the event counter
+ eventCount++;
+
+ // Set the fields in the periodic event
+ periodicEventMap.put(PERIODIC_DELAY, period);
+ periodicEventMap.put(PERIODIC_FIRST_TIME, firstEventTime);
+ periodicEventMap.put(PERIODIC_LAST_TIME, lastEventTime);
+ periodicEventMap.put(PERIODIC_CURRENT_TIME, currentEventTime);
+ periodicEventMap.put(PERIODIC_EVENT_COUNT, eventCount);
+
+ // Send the periodic event
+ try {
+ final ApexEvent periodicEvent = new ApexEvent(PERIODIC_EVENT_NAME, PERIODIC_EVENT_VERSION,
+ PERIODIC_EVENT_NAMESPACE, PERIODIC_EVENT_SOURCE, PERIODIC_EVENT_TARGET);
+ periodicEvent.putAll(periodicEventMap);
+ engineServiceEventInterface.sendEvent(periodicEvent);
+ } catch (final ApexEventException e) {
+ LOGGER.warn("could not send Apex periodic event " + PERIODIC_EVENT_NAME + ":" + PERIODIC_EVENT_VERSION, e);
+ return;
+ }
+
+ // Save the current time as the last time
+ lastEventTime = currentEventTime;
+ }
+
+ /**
+ * Cancel the timer.
+ *
+ * @return true, if cancel
+ */
+ @Override
+ public boolean cancel() {
+ // Cancel the timer
+ if (timer != null) {
+ timer.cancel();
+ }
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "ApexPeriodicEventGenerator [period=" + period + ", firstEventTime=" + firstEventTime
+ + ", lastEventTime=" + lastEventTime + ", eventCount=" + eventCount + "]";
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/PeeredReference.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/PeeredReference.java
new file mode 100644
index 000000000..9560a834c
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/PeeredReference.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+
+/**
+ * This class holds a reference to an event consumer and producer that have been peered.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class PeeredReference {
+ // The consumer putting events into APEX
+ private final ApexEventConsumer peeredConsumer;
+
+ // The synchronous producer taking events out of APEX
+ private final ApexEventProducer peeredProducer;
+
+ /**
+ * Create a peered consumer/producer reference
+ *
+ * @param peeredMode the peered mode for which to return the reference
+ * @param consumer the consumer that is receiving event
+ * @param producer the producer that is sending events
+ */
+ public PeeredReference(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer, final ApexEventProducer producer) {
+ this.peeredConsumer = consumer;
+ this.peeredProducer = producer;
+
+ // Set the peered reference on the producer and consumer
+ peeredConsumer.setPeeredReference(peeredMode, this);
+ peeredProducer.setPeeredReference(peeredMode, this);
+ }
+
+ /**
+ * Gets the synchronous consumer putting events into the cache.
+ *
+ * @return the source synchronous consumer
+ */
+ public ApexEventConsumer getPeeredConsumer() {
+ return peeredConsumer;
+ }
+
+ /**
+ * Gets the synchronous producer taking events from the cache.
+ *
+ * @return the synchronous producer that is taking events from the cache
+ */
+ public ApexEventProducer getPeeredProducer() {
+ return peeredProducer;
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java
new file mode 100644
index 000000000..25f92d843
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/SynchronousEventCache.java
@@ -0,0 +1,294 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This class holds a cache of the synchronous events sent into Apex and that have not yet been replied to. It runs a thread to time out events that have not
+ * been replied to in the specified timeout.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class SynchronousEventCache extends PeeredReference implements Runnable {
+ // Get a reference to the logger
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(SynchronousEventCache.class);
+
+ // The default amount of time to wait for a synchronous event to be replied to is 1 second
+ private static final long DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT = 1000;
+
+ // The timeout to wait between event polls in milliseconds and the time to wait for the thread to stop
+ private static final long OUTSTANDING_EVENT_POLL_TIMEOUT = 50;
+ private static final long CACHE_STOP_WAIT_INTERVAL = 10;
+
+ // The time in milliseconds to wait for the reply to a sent synchronous event
+ private long synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
+
+ // Map holding outstanding synchronous events
+ private final Map<Long, SimpleEntry<Long, Object>> toApexEventMap = new HashMap<Long, SimpleEntry<Long, Object>>();
+
+ // Map holding reply events
+ private final Map<Long, SimpleEntry<Long, Object>> fromApexEventMap = new HashMap<Long, SimpleEntry<Long, Object>>();
+
+ // The message listener thread and stopping flag
+ private final Thread synchronousEventCacheThread;
+ private boolean stopOrderedFlag = false;
+
+ /**
+ * Create a synchronous event cache that caches outstanding synchronous Apex events.
+ *
+ * @param peeredMode the peered mode for which to return the reference
+ * @param consumer the consumer that is populating the cache
+ * @param producer the producer that is emptying the cache
+ * @param synchronousEventTimeout the time in milliseconds to wait for the reply to a sent synchronous event
+ */
+ public SynchronousEventCache(final EventHandlerPeeredMode peeredMode, final ApexEventConsumer consumer, final ApexEventProducer producer, final long synchronousEventTimeout) {
+ super(peeredMode, consumer, producer);
+
+ if (synchronousEventTimeout != 0) {
+ this.synchronousEventTimeout = synchronousEventTimeout;
+ }
+ else {
+ this.synchronousEventTimeout = DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT;
+ }
+
+ // Start scanning the outstanding events
+ synchronousEventCacheThread = new Thread(this);
+ synchronousEventCacheThread.setDaemon(true);
+ synchronousEventCacheThread.start();
+ }
+
+ /**
+ * Gets the timeout value for synchronous events.
+ *
+ * @return the synchronous event timeout
+ */
+ public long getSynchronousEventTimeout() {
+ return synchronousEventTimeout;
+ }
+
+ /**
+ * Cache a synchronized event sent into Apex in the event cache.
+ *
+ * @param executionId the execution ID that was assigned to the event
+ * @param event the apex event
+ */
+ public void cacheSynchronizedEventToApex(final long executionId, final Object event) {
+ // Add the event to the map
+ synchronized (toApexEventMap) {
+ cacheSynchronizedEvent(toApexEventMap, executionId, event);
+ }
+ }
+
+ /**
+ * Remove the record of an event sent to Apex if it exists in the cache.
+ *
+ * @param executionId the execution ID of the event
+ * @return The removed event
+ */
+ public Object removeCachedEventToApexIfExists(final long executionId) {
+ synchronized (toApexEventMap) {
+ return removeCachedEventIfExists(toApexEventMap, executionId);
+ }
+ }
+
+ /**
+ * Check if an event exists in the to apex cache.
+ *
+ * @param executionId the execution ID of the event
+ * @return true if the event exists, false otherwise
+ */
+ public boolean existsEventToApex(final long executionId) {
+ synchronized (toApexEventMap) {
+ return toApexEventMap.containsKey(executionId);
+ }
+ }
+
+ /**
+ * Cache synchronized event received from Apex in the event cache.
+ *
+ * @param executionId the execution ID of the event
+ * @param event the apex event
+ */
+ public void cacheSynchronizedEventFromApex(final long executionId, final Object event) {
+ // Add the event to the map
+ synchronized (fromApexEventMap) {
+ cacheSynchronizedEvent(fromApexEventMap, executionId, event);
+ }
+ }
+
+ /**
+ * Remove the record of an event received from Apex if it exists in the cache.
+ *
+ * @param executionId the execution ID of the event
+ * @return The removed event
+ */
+ public Object removeCachedEventFromApexIfExists(final long executionId) {
+ synchronized (fromApexEventMap) {
+ return removeCachedEventIfExists(fromApexEventMap, executionId);
+ }
+ }
+
+ /**
+ * Check if an event exists in the from apex cache.
+ *
+ * @param executionId the execution ID of the event
+ * @return true if the event exists, false otherwise
+ */
+ public boolean existsEventFromApex(final long executionId) {
+ synchronized (fromApexEventMap) {
+ return fromApexEventMap.containsKey(executionId);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ LOGGER.entry();
+
+ // Periodic scan of outstanding events
+ while (synchronousEventCacheThread.isAlive() && !stopOrderedFlag) {
+ ThreadUtilities.sleep(OUTSTANDING_EVENT_POLL_TIMEOUT);
+
+ // Check for timeouts on events
+ synchronized (toApexEventMap) {
+ timeoutEventsOnCache(toApexEventMap);
+ }
+ synchronized (fromApexEventMap) {
+ timeoutEventsOnCache(fromApexEventMap);
+ }
+ }
+
+ LOGGER.exit();
+ }
+
+ /**
+ * Stops the scanning thread and clears the cache.
+ */
+ public synchronized void stop() {
+ LOGGER.entry();
+ stopOrderedFlag = true;
+
+ while (synchronousEventCacheThread.isAlive()) {
+ ThreadUtilities.sleep(CACHE_STOP_WAIT_INTERVAL);
+ }
+
+ // Check if there are any unprocessed events
+ if (!toApexEventMap.isEmpty()) {
+ LOGGER.warn(toApexEventMap.size() + " synchronous events dropped due to system shutdown");
+ }
+
+ toApexEventMap.clear();
+ LOGGER.exit();
+ }
+
+ /**
+ * Cache a synchronized event sent in an event cache.
+ * @param eventCacheMap the map to cache the event on
+ * @param executionId the execution ID of the event
+ * @param event the event to cache
+ */
+ private void cacheSynchronizedEvent(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap, final long executionId, final Object event) {
+ LOGGER.entry("Adding event with execution ID: " + executionId);
+
+ // Check if the event is already in the cache
+ if (eventCacheMap.containsKey(executionId)) {
+ // If there was no sent event then the event timed out or some unexpected event was received
+ final String errorMessage = "an event with ID " + executionId
+ + " already exists in the synchronous event cache, execution IDs must be unique in the system";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+
+ // Add the event to the map
+ eventCacheMap.put(executionId, new SimpleEntry<Long, Object>(System.currentTimeMillis(), event));
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event has been cached:" + event);
+ }
+
+ LOGGER.exit("Added: " + executionId);
+ }
+
+ /**
+ * Remove the record of an event if it exists in the cache.
+ *
+ * @param eventCacheMap the map to remove the event from
+ * @param executionId the execution ID of the event
+ * @return The removed event
+ */
+ private Object removeCachedEventIfExists(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap, final long executionId) {
+ LOGGER.entry("Removing: " + executionId);
+
+ final SimpleEntry<Long, Object> removedEventEntry = eventCacheMap.remove(executionId);
+
+ if (removedEventEntry != null) {
+ LOGGER.exit("Removed: " + executionId);
+ return removedEventEntry.getValue();
+ }
+ else {
+ // The event may not be one of the events in our cache, so we just ignore removal failures
+ return null;
+ }
+ }
+
+ /**
+ * Time out events on an event cache map. Events that have a timeout longer than the configured timeout are timed out.
+ * @param eventCacheMap the event cache to operate on
+ */
+ private void timeoutEventsOnCache(final Map<Long, SimpleEntry<Long, Object>> eventCacheMap) {
+ // Use a set to keep track of the events that have timed out
+ final Set<Long> timedOutEventSet = new HashSet<>();
+
+ for (final Entry<Long, SimpleEntry<Long, Object>> cachedEventEntry : eventCacheMap.entrySet()) {
+ // The amount of time we are waiting for the event reply
+ final long eventWaitTime = System.currentTimeMillis() - cachedEventEntry.getValue().getKey();
+
+ // Have we a timeout?
+ if (eventWaitTime > synchronousEventTimeout) {
+ timedOutEventSet.add(cachedEventEntry.getKey());
+ }
+ }
+
+ // Remove timed out events from the map
+ for (final long timedoutEventExecutionID : timedOutEventSet) {
+ // Remove the map entry and issue a warning
+ final SimpleEntry<Long, Object> timedOutEventEntry = eventCacheMap.remove(timedoutEventExecutionID);
+
+ LOGGER.warn("synchronous event timed out, reply not received in " + synchronousEventTimeout + " milliseconds on event "
+ + timedOutEventEntry.getValue());
+ }
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java
new file mode 100644
index 000000000..8f54c049b
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl;
+
+import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This factory class creates event consumers of various technology types for Apex engines.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class EventConsumerFactory {
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventConsumerFactory.class);
+
+ /**
+ * Empty constructor with no generic overloading.
+ */
+ public EventConsumerFactory() {}
+
+ /**
+ * Create an event consumer of the required type for the specified consumer technology.
+ *
+ * @param name the name of the consumer
+ * @param consumerParameters The parameters for the Apex engine, we use the technology type of
+ * the required consumer
+ * @return the event consumer
+ * @throws ApexEventException on errors creating the Apex event consumer
+ */
+ public ApexEventConsumer createConsumer(final String name, final EventHandlerParameters consumerParameters)
+ throws ApexEventException {
+ // Get the carrier technology parameters
+ final CarrierTechnologyParameters technologyParameters = consumerParameters.getCarrierTechnologyParameters();
+
+ // Get the class for the event consumer using reflection
+ final String consumerPluginClass = technologyParameters.getEventConsumerPluginClass();
+ Object consumerPluginObject = null;
+ try {
+ consumerPluginObject = Class.forName(consumerPluginClass).newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ final String errorMessage = "could not create an Apex event consumer for \"" + name
+ + "\" for the carrier technology \"" + technologyParameters.getLabel()
+ + "\", specified event consumer plugin class \"" + consumerPluginClass + "\" not found";
+ LOGGER.error(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+
+ // Check the class is an event consumer
+ if (!(consumerPluginObject instanceof ApexEventConsumer)) {
+ final String errorMessage = "could not create an Apex event consumer \"" + name
+ + "\" for the carrier technology \"" + technologyParameters.getLabel()
+ + "\", specified event consumer plugin class \"" + consumerPluginClass
+ + "\" is not an instance of \"" + ApexEventConsumer.class.getCanonicalName() + "\"";
+ LOGGER.error(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+
+ return (ApexEventConsumer) consumerPluginObject;
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java
new file mode 100644
index 000000000..9bbbad362
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java
@@ -0,0 +1,82 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl;
+
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventProducer;
+import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This factory class creates event producers for the defined technology type for Apex engines.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class EventProducerFactory {
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventProducerFactory.class);
+
+ /**
+ * Empty constructor with no generic overloading.
+ */
+ public EventProducerFactory() {}
+
+ /**
+ * Create an event producer of the required type for the specified producer technology.
+ *
+ * @param name the name of the producer
+ * @param producerParameters The Apex parameters containing the configuration for the producer
+ * @return the event producer
+ * @throws ApexEventException on errors creating the Apex event producer
+ */
+ public ApexEventProducer createProducer(final String name, final EventHandlerParameters producerParameters)
+ throws ApexEventException {
+ // Get the carrier technology parameters
+ final CarrierTechnologyParameters technologyParameters = producerParameters.getCarrierTechnologyParameters();
+
+ // Get the class for the event producer using reflection
+ final String producerPluginClass = technologyParameters.getEventProducerPluginClass();
+ Object producerPluginObject = null;
+ try {
+ producerPluginObject = Class.forName(producerPluginClass).newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ final String errorMessage = "could not create an Apex event producer for Producer \"" + name
+ + "\" for the carrier technology \"" + technologyParameters.getLabel()
+ + "\", specified event producer plugin class \"" + producerPluginClass + "\" not found";
+ LOGGER.error(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+
+ // Check the class is an event producer
+ if (!(producerPluginObject instanceof ApexEventProducer)) {
+ final String errorMessage = "could not create an Apex event producer for Producer \"" + name
+ + "\" for the carrier technology \"" + technologyParameters.getLabel()
+ + "\", specified event producer plugin class \"" + producerPluginClass
+ + "\" is not an instance of \"" + ApexEventProducer.class.getCanonicalName() + "\"";
+ LOGGER.error(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+
+ return (ApexEventProducer) producerPluginObject;
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProtocolFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProtocolFactory.java
new file mode 100644
index 000000000..85c5bf03f
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProtocolFactory.java
@@ -0,0 +1,76 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl;
+
+import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
+import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This factory class uses the Apex event protocol parameters to create and return an instance of
+ * the correct Apex event protocol converter plugin for the specified event protocol.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class EventProtocolFactory {
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventProtocolFactory.class);
+
+ /**
+ * Create an event converter that converts between an
+ * {@link org.onap.policy.apex.service.engine.event.ApexEvent} and the specified event protocol.
+ *
+ * @param name the name of the event protocol
+ * @param eventProtocolParameters the event protocol parameters defining what to convert from
+ * and to
+ * @return The event converter for converting events to and from Apex format
+ */
+ public ApexEventProtocolConverter createConverter(final String name,
+ final EventProtocolParameters eventProtocolParameters) {
+ // Get the class for the event protocol plugin using reflection
+ final String eventProtocolPluginClass = eventProtocolParameters.getEventProtocolPluginClass();
+ Object eventProtocolPluginObject = null;
+ try {
+ eventProtocolPluginObject = Class.forName(eventProtocolPluginClass).newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ final String errorMessage = "could not create an Apex event protocol converter for \"" + name
+ + "\" for the protocol \"" + eventProtocolParameters.getLabel()
+ + "\", specified event protocol converter plugin class \"" + eventProtocolPluginClass
+ + "\" not found";
+ LOGGER.error(errorMessage, e);
+ throw new ApexEventRuntimeException(errorMessage, e);
+ }
+
+ // Check the class is an event consumer
+ if (!(eventProtocolPluginObject instanceof ApexEventProtocolConverter)) {
+ final String errorMessage = "could not create an Apex event protocol converter for \"" + name
+ + "\" for the protocol \"" + eventProtocolParameters.getLabel()
+ + "\", specified event protocol converter plugin class \"" + eventProtocolPluginClass
+ + "\" is not an instance of \"" + ApexEventProtocolConverter.class.getCanonicalName() + "\"";
+ LOGGER.error(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+ ((ApexEventProtocolConverter) eventProtocolPluginObject).init(eventProtocolParameters);
+ return (ApexEventProtocolConverter) eventProtocolPluginObject;
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/Apex2ApexEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/Apex2ApexEventConverter.java
new file mode 100644
index 000000000..b73aeb567
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/Apex2ApexEventConverter.java
@@ -0,0 +1,140 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.apexprotocolplugin;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.policy.apex.service.engine.event.ApexEvent;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventList;
+import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
+import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class Apex2ApexEventConverter passes through {@link ApexEvent} instances. It is used for
+ * transferring Apex events directly as POJOs between APEX producers and consumers.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class Apex2ApexEventConverter implements ApexEventProtocolConverter {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(Apex2ApexEventConverter.class);
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter#init(org.onap.policy.
+ * apex. service.parameters.eventprotocol.EventProtocolParameters)
+ */
+ @Override
+ public void init(final EventProtocolParameters parameters) {
+ // Check and get the APEX parameters
+ if (!(parameters instanceof ApexEventProtocolParameters)) {
+ final String errorMessage = "specified consumer properties are not applicable to the APEX event protocol";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String,
+ * java.lang.Object)
+ */
+ @Override
+ public List<ApexEvent> toApexEvent(final String eventName, final Object eventObject) throws ApexEventException {
+ // Check the event eventObject
+ if (eventObject == null) {
+ LOGGER.warn("event processing failed, event is null");
+ throw new ApexEventException("event processing failed, event is null");
+ }
+
+ // The list of events we will return
+ final List<ApexEvent> eventList = new ArrayList<>();
+
+ try {
+ // Check if its a single APEX event
+ if (!(eventObject instanceof ApexEvent)) {
+ throw new ApexEventException("incoming event (" + eventObject + ") is not an ApexEvent");
+ }
+
+ final ApexEvent event = (ApexEvent) eventObject;
+
+ // Check whether we have any ApexEventList fields, if so this is an event of events and
+ // all fields should be of type ApexEventList
+ boolean foundEventListFields = false;
+ boolean foundOtherFields = false;
+ for (final Object fieldObject : event.values()) {
+ if (fieldObject instanceof ApexEventList) {
+ foundEventListFields = true;
+
+ // Add the events to the event list
+ eventList.addAll((ApexEventList) fieldObject);
+ } else {
+ foundOtherFields = true;
+ }
+ }
+
+ // If we found both event list fields and other fields we're in trouble
+ if (foundEventListFields && foundOtherFields) {
+ throw new ApexEventException("incoming event (" + eventObject
+ + ") has both event list fields and other fields, it cannot be processed");
+ }
+
+ // Check if the incoming event just has other fields, if so it's just a regular event
+ // and we add it to the event list as the only event there
+ if (foundOtherFields) {
+ eventList.add(event);
+ }
+ } catch (final Exception e) {
+ final String errorString = "Failed to unmarshal APEX event: " + e.getMessage() + ", event=" + eventObject;
+ LOGGER.warn(errorString, e);
+ throw new ApexEventException(errorString, e);
+ }
+
+ // Return the list of events we have unmarshalled
+ return eventList;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy.
+ * apex.service.engine.event.ApexEvent)
+ */
+ @Override
+ public Object fromApexEvent(final ApexEvent apexEvent) throws ApexEventException {
+ // Check the Apex event
+ if (apexEvent == null) {
+ LOGGER.warn("event processing failed, Apex event is null");
+ throw new ApexEventException("event processing failed, Apex event is null");
+ }
+
+ return apexEvent;
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/ApexEventProtocolParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/ApexEventProtocolParameters.java
new file mode 100644
index 000000000..10cd58eb7
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/ApexEventProtocolParameters.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.apexprotocolplugin;
+
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters;
+
+/**
+ * Event protocol parameters for JSON as an event protocol, there are no user defined parameters.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ApexEventProtocolParameters extends EventProtocolParameters {
+ /** The label of this event protocol. */
+ public static final String APEX_EVENT_PROTOCOL_LABEL = "APEX";
+
+ /**
+ * Constructor to create a JSON event protocol parameter instance and register the instance with
+ * the parameter service.
+ */
+ public ApexEventProtocolParameters() {
+ this(ApexEventProtocolParameters.class.getCanonicalName(), APEX_EVENT_PROTOCOL_LABEL);
+ }
+
+ /**
+ * Constructor to create an event protocol parameters instance with the name of a sub class of
+ * this class.
+ *
+ * @param parameterClassName the class name of a sub class of this class
+ * @param eventProtocolLabel the name of the event protocol for this plugin
+ */
+ public ApexEventProtocolParameters(final String parameterClassName, final String eventProtocolLabel) {
+ super(parameterClassName);
+
+ // Set the event protocol properties for the JSON event protocol
+ this.setLabel(eventProtocolLabel);
+
+ // Set the event protocol plugin class
+ this.setEventProtocolPluginClass(Apex2ApexEventConverter.class.getCanonicalName());
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/package-info.java
new file mode 100644
index 000000000..a3c7d0d79
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Contains the implementation of the APEX event protocol converter plugin for events in Json
+ * format.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+package org.onap.policy.apex.service.engine.event.impl.apexprotocolplugin;
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java
new file mode 100644
index 000000000..90a19fff2
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java
@@ -0,0 +1,143 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.enevent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.policy.apex.service.engine.event.ApexEvent;
+import org.onap.policy.apex.service.engine.event.ApexEventConverter;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+import org.onap.policy.apex.core.engine.engine.ApexEngine;
+import org.onap.policy.apex.core.engine.event.EnEvent;
+import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.basicmodel.service.ModelService;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvent;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvents;
+
+/**
+ * The Class ApexEvent2EnEventConverter converts externally facing {@link ApexEvent} instances to
+ * and from instances of {@link EnEvent} that are used internally in the Apex engine core.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public final class ApexEvent2EnEventConverter implements ApexEventConverter {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEvent2EnEventConverter.class);
+
+ // The Apex engine with its event definitions
+ private final ApexEngine apexEngine;
+
+ /**
+ * Set up the event converter.
+ *
+ * @param apexEngine The engine to use to create events to be converted
+ */
+ public ApexEvent2EnEventConverter(final ApexEngine apexEngine) {
+ this.apexEngine = apexEngine;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String,
+ * java.lang.Object)
+ */
+ @Override
+ public List<ApexEvent> toApexEvent(final String eventName, final Object event) throws ApexException {
+ // Check the Engine event
+ if (event == null) {
+ LOGGER.warn("event processing failed, engine event is null");
+ throw new ApexEventException("event processing failed, engine event is null");
+ }
+
+ // Cast the event to an Engine event event, if our conversion is correctly configured, this
+ // cast should always work
+ EnEvent enEvent = null;
+ try {
+ enEvent = (EnEvent) event;
+ } catch (final Exception e) {
+ final String errorMessage = "error transferring event \"" + event + "\" to the Apex engine";
+ LOGGER.debug(errorMessage, e);
+ throw new ApexEventRuntimeException(errorMessage, e);
+ }
+
+ // Create the Apex event
+ final AxEvent axEvent = enEvent.getAxEvent();
+ final ApexEvent apexEvent = new ApexEvent(axEvent.getKey().getName(), axEvent.getKey().getVersion(),
+ axEvent.getNameSpace(), axEvent.getSource(), axEvent.getTarget());
+
+ // Copy the ExecutionID from the EnEvent into the ApexEvent
+ apexEvent.setExecutionID(enEvent.getExecutionID());
+
+ // Copy he exception message to the Apex event if it is set
+ if (enEvent.getExceptionMessage() != null) {
+ apexEvent.setExceptionMessage(enEvent.getExceptionMessage());
+ }
+
+ // Set the data on the apex event
+ apexEvent.putAll(enEvent);
+
+ // Return the event in a single element
+ final ArrayList<ApexEvent> eventList = new ArrayList<ApexEvent>();
+ eventList.add(apexEvent);
+ return eventList;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy.
+ * apex.service.engine.event.ApexEvent)
+ */
+ @Override
+ public EnEvent fromApexEvent(final ApexEvent apexEvent) throws ApexException {
+ // Check the Apex model
+ if (apexEngine == null) {
+ LOGGER.warn("event processing failed, apex engine is null");
+ throw new ApexEventException("event processing failed, apex engine is null");
+ }
+
+ // Get the event definition
+ final AxEvent eventDefinition = ModelService.getModel(AxEvents.class).get(apexEvent.getName());
+ if (eventDefinition == null) {
+ LOGGER.warn("event processing failed, event \"" + apexEvent.getName() + "\" not found in apex model");
+ throw new ApexEventException(
+ "event processing failed, event \"" + apexEvent.getName() + "\" not found in apex model");
+ }
+
+ // Create the internal engine event
+ final EnEvent enEvent = apexEngine.createEvent(eventDefinition.getKey());
+
+ // Set the data on the engine event
+ enEvent.putAll(apexEvent);
+
+ // copy the ExecutionID from the ApexEvent into the EnEvent
+ enEvent.setExecutionID(apexEvent.getExecutionID());
+
+ return enEvent;
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/package-info.java
new file mode 100644
index 000000000..6bc6bc2b3
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/package-info.java
@@ -0,0 +1,28 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides conversion between externally facing
+ * {@link org.onap.policy.apex.service.engine.event.ApexEvent} instances and internal
+ * {@link org.onap.policy.apex.core.engine.event.EnEvent} instances.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+package org.onap.policy.apex.service.engine.event.impl.enevent;
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorCarrierTechnologyParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorCarrierTechnologyParameters.java
new file mode 100644
index 000000000..fb722ea2f
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorCarrierTechnologyParameters.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.eventrequestor;
+
+import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
+
+/**
+ * This class holds the parameters that allows an output event to to be sent back into APEX as one
+ * or multiple input events, there are no user defined parameters.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class EventRequestorCarrierTechnologyParameters extends CarrierTechnologyParameters {
+ // @formatter:off
+ /** The label of this carrier technology. */
+ public static final String EVENT_REQUESTOR_CARRIER_TECHNOLOGY_LABEL = "EVENT_REQUESTOR";
+
+ /** The producer plugin class for the EVENT_REQUESTOR carrier technology. */
+ public static final String EVENT_REQUESTOR_EVENT_PRODUCER_PLUGIN_CLASS =
+ EventRequestorProducer.class.getCanonicalName();
+
+ /** The consumer plugin class for the EVENT_REQUESTOR carrier technology. */
+ public static final String EVENT_REQUESTOR_EVENT_CONSUMER_PLUGIN_CLASS =
+ EventRequestorConsumer.class.getCanonicalName();
+ // @formatter:on
+
+ /**
+ * Constructor to create an event requestor carrier technology parameters instance and register
+ * the instance with the parameter service.
+ */
+ public EventRequestorCarrierTechnologyParameters() {
+ super(EventRequestorCarrierTechnologyParameters.class.getCanonicalName());
+
+ // Set the carrier technology properties for the EVENT_REQUESTOR carrier technology
+ this.setLabel(EVENT_REQUESTOR_CARRIER_TECHNOLOGY_LABEL);
+ this.setEventProducerPluginClass(EVENT_REQUESTOR_EVENT_PRODUCER_PLUGIN_CLASS);
+ this.setEventConsumerPluginClass(EVENT_REQUESTOR_EVENT_CONSUMER_PLUGIN_CLASS);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
+ */
+ @Override
+ public String validate() {
+ return "";
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java
new file mode 100644
index 000000000..b472cc9c7
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java
@@ -0,0 +1,218 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.eventrequestor;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
+import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements an Apex event consumer that receives events from its peered event requestor
+ * producer.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class EventRequestorConsumer implements ApexEventConsumer, Runnable {
+ // Get a reference to the logger
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorConsumer.class);
+
+ // The amount of time to wait in milliseconds between checks that the consumer thread has
+ // stopped
+ private static final long EVENT_REQUESTOR_WAIT_SLEEP_TIME = 50;
+
+ // The event receiver that will receive events from this consumer
+ private ApexEventReceiver eventReceiver;
+
+ // The name for this consumer
+ private String name = null;
+
+ // The peer references for this event handler
+ private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
+ new EnumMap<>(EventHandlerPeeredMode.class);
+
+ // Temporary request holder for incoming event send requests
+ private final BlockingQueue<Object> incomingEventRequestQueue = new LinkedBlockingQueue<>();
+
+ // The consumer thread and stopping flag
+ private Thread consumerThread;
+ private boolean stopOrderedFlag = false;
+
+ // The number of events received to date
+ private int eventsReceived = 0;
+
+ @Override
+ public void init(final String consumerName, final EventHandlerParameters consumerParameters,
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ this.eventReceiver = incomingEventReceiver;
+ this.name = consumerName;
+
+ // Check and get the event requestor consumer properties
+ if (!(consumerParameters
+ .getCarrierTechnologyParameters() instanceof EventRequestorCarrierTechnologyParameters)) {
+ final String errorMessage =
+ "specified consumer properties are not applicable to event Requestor consumer (" + this.name + ")";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+
+ // Check if we are in peered mode
+ if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
+ final String errorMessage = "event Requestor consumer (" + this.name
+ + ") must run in peered requestor mode with a event Requestor producer";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+
+ }
+
+ /**
+ * Receive an incoming event send request from the peered event Requestor producer and queue it
+ *
+ * @param eventObject the incoming event to process
+ * @throws ApexEventRuntimeException on queueing errors
+ */
+ public void processEvent(final Object eventObject) {
+ // Push the event onto the queue for handling
+ try {
+ incomingEventRequestQueue.add(eventObject);
+ } catch (final Exception e) {
+ final String errorMessage =
+ "could not queue request \"" + eventObject + "\" on event Requestor consumer (" + this.name + ")";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start()
+ */
+ @Override
+ public void start() {
+ // Configure and start the event reception thread
+ final String threadName = this.getClass().getName() + ":" + this.name;
+ consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName()
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get the number of events received to date
+ *
+ * @return the number of events received
+ */
+ public int getEventsReceived() {
+ return eventsReceived;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.
+ * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode)
+ */
+ @Override
+ public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
+ return peerReferenceMap.get(peeredMode);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.
+ * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode,
+ * org.onap.policy.apex.service.engine.event.PeeredReference)
+ */
+ @Override
+ public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
+ peerReferenceMap.put(peeredMode, peeredReference);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ // The endless loop that receives events using REST calls
+ while (consumerThread.isAlive() && !stopOrderedFlag) {
+ try {
+ // Take the next event from the queue
+ final Object eventObject =
+ incomingEventRequestQueue.poll(EVENT_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS);
+ if (eventObject == null) {
+ // Poll timed out, wait again
+ continue;
+ }
+
+ // Send the event into Apex
+ eventReceiver.receiveEvent(eventObject);
+
+ eventsReceived++;
+ } catch (final InterruptedException e) {
+ LOGGER.debug("Thread interrupted, Reason {}", e.getMessage());
+ Thread.currentThread().interrupt();
+ } catch (final Exception e) {
+ LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
+ }
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.producer.ApexEventConsumer#stop()
+ */
+ @Override
+ public void stop() {
+ stopOrderedFlag = true;
+
+ while (consumerThread.isAlive()) {
+ ThreadUtilities.sleep(EVENT_REQUESTOR_WAIT_SLEEP_TIME);
+ }
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java
new file mode 100644
index 000000000..4a972f2ce
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java
@@ -0,0 +1,178 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.eventrequestor;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventProducer;
+import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Concrete implementation of an Apex event producer that sends one or more events to its peered
+ * event requestor consumer.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ *
+ */
+public class EventRequestorProducer implements ApexEventProducer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorProducer.class);
+
+ // The name for this producer
+ private String name = null;
+
+ // The peer references for this event handler
+ private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
+ new EnumMap<>(EventHandlerPeeredMode.class);
+
+ // The number of events sent
+ private int eventsSent = 0;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String,
+ * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters)
+ */
+ @Override
+ public void init(final String producerName, final EventHandlerParameters producerParameters)
+ throws ApexEventException {
+ this.name = producerName;
+
+ // Check and get the producer Properties
+ if (!(producerParameters
+ .getCarrierTechnologyParameters() instanceof EventRequestorCarrierTechnologyParameters)) {
+ final String errorMessage =
+ "specified consumer properties are not applicable to event requestor producer (" + this.name + ")";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+
+ // Check if we are in peered mode
+ if (!producerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
+ final String errorMessage = "Event Requestor producer (" + this.name
+ + ") must run in peered requestor mode with a Event Requestor consumer";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get the number of events sent to date
+ *
+ * @return the number of events received
+ */
+ public int getEventsSent() {
+ return eventsSent;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.
+ * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode)
+ */
+ @Override
+ public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
+ return peerReferenceMap.get(peeredMode);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.
+ * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode,
+ * org.onap.policy.apex.service.engine.event.PeeredReference)
+ */
+ @Override
+ public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
+ peerReferenceMap.put(peeredMode, peeredReference);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang.
+ * String, java.lang.Object)
+ */
+ @Override
+ public void sendEvent(final long executionId, final String eventName, final Object eventObject) {
+ // Check if this is a synchronized event, if so we have received a reply
+ final SynchronousEventCache synchronousEventCache =
+ (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
+ if (synchronousEventCache != null) {
+ synchronousEventCache.removeCachedEventToApexIfExists(executionId);
+ }
+
+ // Find the peered consumer for this producer
+ final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR);
+ if (peeredRequestorReference != null) {
+ // Find the event Response Consumer that will handle this request
+ final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer();
+ if (!(consumer instanceof EventRequestorConsumer)) {
+ final String errorMessage = "send of event to event consumer \""
+ + peeredRequestorReference.getPeeredConsumer() + "\" failed,"
+ + " event response consumer is not an instance of EventRequestorConsumer\n" + eventObject;
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+
+ // Use the consumer to handle this event
+ final EventRequestorConsumer eventRequstConsumer = (EventRequestorConsumer) consumer;
+ eventRequstConsumer.processEvent(eventObject);
+
+ eventsSent++;
+ } else {
+ // No peered consumer defined
+ final String errorMessage = "send of event failed, event response consumer is not defined\n" + eventObject;
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#stop()
+ */
+ @Override
+ public void stop() {
+ // For event requestor, all the implementation is in the consumer
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/package-info.java
new file mode 100644
index 000000000..3b6da08a4
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/package-info.java
@@ -0,0 +1,26 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Implements the Event Requestor carrier technology for multiple event input from an output event.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+package org.onap.policy.apex.service.engine.event.impl.eventrequestor;
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java
new file mode 100644
index 000000000..f7f25cb9e
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java
@@ -0,0 +1,208 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin;
+
+import org.onap.policy.apex.model.utilities.ResourceUtils;
+import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.ApexFileEventConsumer;
+import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.producer.ApexFileEventProducer;
+import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
+
+/**
+ * This class holds the parameters that allows transport of events into and out of Apex using files
+ * and standard input and output.
+ *
+ * <p>
+ * The following parameters are defined:
+ * <ol>
+ * <li>fileName: The full path to the file from which to read events or to which to write events.
+ * <li>standardIO: If this flag is set to true, then standard input is used to read events in or
+ * standard output is used to write events and the fileName parameter is ignored if present
+ * <li>standardError: If this flag is set to true, then standard error is used to write events
+ * <li>streamingMode: If this flag is set to true, then streaming mode is set for reading events and
+ * event handling will wait on the input stream for events until the stream is closed. If streaming
+ * model is off, then event reading completes when the end of input is detected.
+ * <li>startDelay: The amount of milliseconds to wait at startup startup before processing the first
+ * event.
+ * </ol>
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class FILECarrierTechnologyParameters extends CarrierTechnologyParameters {
+ // @formatter:off
+ /** The label of this carrier technology. */
+ public static final String FILE_CARRIER_TECHNOLOGY_LABEL = "FILE";
+
+ /** The producer plugin class for the FILE carrier technology. */
+ public static final String FILE_EVENT_PRODUCER_PLUGIN_CLASS = ApexFileEventProducer.class.getCanonicalName();
+
+ /** The consumer plugin class for the FILE carrier technology. */
+ public static final String FILE_EVENT_CONSUMER_PLUGIN_CLASS = ApexFileEventConsumer.class.getCanonicalName();
+
+ private String fileName;
+ private boolean standardIO = false;
+ private boolean standardError = false;
+ private boolean streamingMode = false;
+ private long startDelay = 0;
+ // @formatter:on
+
+ /**
+ * Constructor to create a file carrier technology parameters instance and register the instance
+ * with the parameter service.
+ */
+ public FILECarrierTechnologyParameters() {
+ super(FILECarrierTechnologyParameters.class.getCanonicalName());
+
+ // Set the carrier technology properties for the FILE carrier technology
+ this.setLabel(FILE_CARRIER_TECHNOLOGY_LABEL);
+ this.setEventProducerPluginClass(FILE_EVENT_PRODUCER_PLUGIN_CLASS);
+ this.setEventConsumerPluginClass(FILE_EVENT_CONSUMER_PLUGIN_CLASS);
+ }
+
+ /**
+ * Gets the file name from which to read or to which to write events.
+ *
+ * @return the file name from which to read or to which to write events
+ */
+ public String getFileName() {
+ return ResourceUtils.getFilePath4Resource(fileName);
+ }
+
+ /**
+ * Checks if is standard IO should be used for input or output.
+ *
+ * @return true, if standard IO should be used for input or output
+ */
+ public boolean isStandardIO() {
+ return standardIO;
+ }
+
+ /**
+ * Checks if is standard error should be used for output.
+ *
+ * @return true, if standard error should be used for output
+ */
+ public boolean isStandardError() {
+ return standardError;
+ }
+
+ /**
+ * Checks if is streaming mode is on.
+ *
+ * @return true, if streaming mode is on
+ */
+ public boolean isStreamingMode() {
+ return streamingMode;
+ }
+
+ /**
+ * Sets the file name from which to read or to which to write events.
+ *
+ * @param fileName the file name from which to read or to which to write events
+ */
+ public void setFileName(final String fileName) {
+ this.fileName = fileName;
+ }
+
+ /**
+ * Sets if standard IO should be used for event input or output.
+ *
+ * @param standardIO if standard IO should be used for event input or output
+ */
+ public void setStandardIO(final boolean standardIO) {
+ this.standardIO = standardIO;
+ }
+
+ /**
+ * Sets if standard error should be used for event output.
+ *
+ * @param standardError if standard error should be used for event output
+ */
+ public void setStandardError(final boolean standardError) {
+ this.standardError = standardError;
+ }
+
+ /**
+ * Sets streaming mode.
+ *
+ * @param streamingMode the streaming mode value
+ */
+ public void setStreamingMode(final boolean streamingMode) {
+ this.streamingMode = streamingMode;
+ }
+
+ /**
+ * Gets the delay in milliseconds before the plugin starts processing
+ *
+ * @return the delay
+ */
+ public long getStartDelay() {
+ return startDelay;
+ }
+
+ /**
+ * Sets the delay in milliseconds before the plugin starts processing
+ *
+ * @param startDelay the delay
+ */
+ public void setStartDelay(final long startDelay) {
+ this.startDelay = startDelay;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters#
+ * toString()
+ */
+ @Override
+ public String toString() {
+ return "FILECarrierTechnologyParameters [fileName=" + fileName + ", standardIO=" + standardIO
+ + ", standardError=" + standardError + ", streamingMode=" + streamingMode + ", startDelay=" + startDelay
+ + "]";
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
+ */
+ @Override
+ public String validate() {
+ final StringBuilder errorMessageBuilder = new StringBuilder();
+
+ errorMessageBuilder.append(super.validate());
+
+ if (!standardIO && !standardError && (fileName == null || fileName.trim().length() == 0)) {
+ errorMessageBuilder.append(
+ " fileName not specified or is blank or null, it must be specified as a valid file location\n");
+ }
+
+ if (standardIO || standardError) {
+ streamingMode = true;
+ }
+
+ if (startDelay < 0) {
+ errorMessageBuilder.append(" startDelay must be zero or a positive number of milliseconds\n");
+ }
+
+ return errorMessageBuilder.toString();
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java
new file mode 100644
index 000000000..7521c3a08
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java
@@ -0,0 +1,247 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FILECarrierTechnologyParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Concrete implementation an Apex event consumer that reads events from a file. This consumer also
+ * implements ApexEventProducer and therefore can be used as a synchronous consumer.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ApexFileEventConsumer implements ApexEventConsumer, Runnable {
+
+ // Get a reference to the logger
+ private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventConsumer.class);
+
+ // The input stream to read events from
+ private InputStream eventInputStream;
+
+ // The text block reader that will read text blocks from the contents of the file
+ private TextBlockReader textBlockReader;
+
+ // The event receiver that will receive asynchronous events from this consumer
+ private ApexEventReceiver eventReceiver = null;
+
+ // The consumer thread and stopping flag
+ private Thread consumerThread;
+
+ // The name for this consumer
+ private String consumerName = null;
+
+ // The specific carrier technology parameters for this consumer
+ private FILECarrierTechnologyParameters fileCarrierTechnologyParameters;
+
+ // The peer references for this event handler
+ private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
+ new EnumMap<>(EventHandlerPeeredMode.class);
+
+ // Holds the next identifier for event execution.
+ private static AtomicLong nextExecutionID = new AtomicLong(0L);
+
+ /**
+ * Private utility to get the next candidate value for a Execution ID. This value will always be
+ * unique in a single JVM
+ *
+ * @return the next candidate value for a Execution ID
+ */
+ private static synchronized long getNextExecutionID() {
+ return nextExecutionID.getAndIncrement();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.apps.uservice.consumer.ApexEventConsumer#init(org.onap.policy.apex.apps.
+ * uservice.consumer.ApexEventReceiver)
+ */
+ @Override
+ public void init(final String name, final EventHandlerParameters consumerParameters,
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ this.eventReceiver = incomingEventReceiver;
+ this.consumerName = name;
+
+ // Get and check the Apex parameters from the parameter service
+ if (consumerParameters == null) {
+ final String errorMessage = "Consumer parameters for ApexFileConsumer \"" + consumerName + "\" is null";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+
+ // Check and get the file Properties
+ if (!(consumerParameters.getCarrierTechnologyParameters() instanceof FILECarrierTechnologyParameters)) {
+ final String errorMessage = "specified consumer properties for ApexFileConsumer \"" + consumerName
+ + "\" are not applicable to a File consumer";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+ fileCarrierTechnologyParameters =
+ (FILECarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
+
+ // Open the file producing events
+ try {
+ if (fileCarrierTechnologyParameters.isStandardIO()) {
+ eventInputStream = System.in;
+ } else {
+ eventInputStream = new FileInputStream(fileCarrierTechnologyParameters.getFileName());
+ }
+
+ // Get an event composer for our event source
+ textBlockReader = new TextBlockReaderFactory().getTaggedReader(eventInputStream,
+ consumerParameters.getEventProtocolParameters());
+ } catch (final IOException e) {
+ final String errorMessage = "ApexFileConsumer \"" + consumerName + "\" failed to open file for reading: \""
+ + fileCarrierTechnologyParameters.getFileName() + "\"";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+
+ if (fileCarrierTechnologyParameters.getStartDelay() > 0) {
+ ThreadUtilities.sleep(fileCarrierTechnologyParameters.getStartDelay());
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName()
+ */
+ @Override
+ public String getName() {
+ return consumerName;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.
+ * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode)
+ */
+ @Override
+ public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
+ return peerReferenceMap.get(peeredMode);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.
+ * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode,
+ * org.onap.policy.apex.service.engine.event.PeeredReference)
+ */
+ @Override
+ public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
+ peerReferenceMap.put(peeredMode, peeredReference);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start()
+ */
+ @Override
+ public void start() {
+ // Configure and start the event reception thread
+ final String threadName = this.getClass().getName() + " : " + consumerName;
+ consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ // Check that we have been initialized in async or sync mode
+ if (eventReceiver == null) {
+ LOGGER.warn("\"{}\" has not been initilaized for either asynchronous or synchronous event handling",
+ consumerName);
+ return;
+ }
+
+ // Read the events from the file while there are still events in the file
+ try {
+ // Read all the text blocks
+ TextBlock textBlock;
+ do {
+ // Read the text block
+ textBlock = textBlockReader.readTextBlock();
+
+ // Process the event from the text block if there is one there
+ if (textBlock.getText() != null) {
+ eventReceiver.receiveEvent(getNextExecutionID(), textBlock.getText());
+ }
+ } while (!textBlock.isEndOfText());
+ } catch (final Exception e) {
+ LOGGER.warn("\"" + consumerName + "\" failed to read event from file: \""
+ + fileCarrierTechnologyParameters.getFileName() + "\"", e);
+ } finally {
+ try {
+ eventInputStream.close();
+ } catch (final IOException e) {
+ LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file: \""
+ + fileCarrierTechnologyParameters.getFileName() + "\"", e);
+ }
+ }
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
+ */
+ @Override
+ public void stop() {
+ try {
+ eventInputStream.close();
+ } catch (final IOException e) {
+ LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file for reading: \""
+ + fileCarrierTechnologyParameters.getFileName() + "\"", e);
+ }
+
+ if (consumerThread.isAlive() && !consumerThread.isInterrupted()) {
+ consumerThread.interrupt();
+ }
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java
new file mode 100644
index 000000000..b286f8afe
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java
@@ -0,0 +1,141 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextCharDelimitedParameters;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The class CharacterDelimitedTextBlockReader reads the next block of text between two character
+ * tags from an input stream.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class CharacterDelimitedTextBlockReader implements TextBlockReader {
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(CharacterDelimitedTextBlockReader.class);
+
+ // The character tags
+ private final char startTagChar;
+ private final char endTagChar;
+
+ // The input stream for text
+ private InputStream inputStream;
+
+ // Flag indicating we have seen EOF on the stream
+ private boolean eofOnInputStream = false;
+
+ /**
+ * Constructor, set the delimiters.
+ *
+ * @param startTagChar The start tag for text blocks
+ * @param endTagChar The end tag for text blocks
+ */
+ public CharacterDelimitedTextBlockReader(final char startTagChar, final char endTagChar) {
+ this.startTagChar = startTagChar;
+ this.endTagChar = endTagChar;
+ }
+
+ /**
+ * Constructor, set the delimiters from a character delimited event protocol parameter class.
+ *
+ * @param charDelimitedParameters the character delimited event protocol parameter class
+ */
+ public CharacterDelimitedTextBlockReader(final EventProtocolTextCharDelimitedParameters charDelimitedParameters) {
+ this.startTagChar = charDelimitedParameters.getStartChar();
+ this.endTagChar = charDelimitedParameters.getEndChar();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader#init(
+ * java.io.InputStream)
+ */
+ @Override
+ public void init(final InputStream incomingInputStream) {
+ this.inputStream = incomingInputStream;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader#
+ * readTextBlock()
+ */
+ @Override
+ public TextBlock readTextBlock() throws IOException {
+ // Check if there was a previous end of a text block with a non-empty text block returned
+ if (eofOnInputStream) {
+ return new TextBlock(eofOnInputStream, null);
+ }
+
+ // The initial nesting level of incoming text blocks is always zero
+ int nestingLevel = 0;
+
+ // Holder for the text block
+ final StringBuilder textBlockBuilder = new StringBuilder();
+
+ // Read the next text block
+ while (true) {
+ final char nextChar = (char) inputStream.read();
+
+ // Check for EOF
+ if (nextChar == (char) -1) {
+ eofOnInputStream = true;
+ break;
+ }
+
+ if (nextChar == startTagChar) {
+ nestingLevel++;
+ } else if (nestingLevel == 0 && !Character.isWhitespace(nextChar)) {
+ LOGGER.warn("invalid input on consumer: " + nextChar);
+ continue;
+ }
+
+ textBlockBuilder.append(nextChar);
+
+ // Check for end of the text block, we have come back to level 0
+ if (nextChar == endTagChar) {
+ if (nestingLevel > 0) {
+ nestingLevel--;
+ }
+
+ if (nestingLevel == 0) {
+ break;
+ }
+ }
+ }
+
+ // Condition the text block and return it
+ final String textBlock = textBlockBuilder.toString().trim();
+ if (textBlock.length() > 0) {
+ return new TextBlock(eofOnInputStream, textBlock);
+ } else {
+ return new TextBlock(eofOnInputStream, null);
+ }
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java
new file mode 100644
index 000000000..e40bc756c
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java
@@ -0,0 +1,167 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * The Class TextBlockReader reads the next block of text from an input stream.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class HeaderDelimitedTextBlockReader implements TextBlockReader, Runnable {
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(HeaderDelimitedTextBlockReader.class);
+
+ // The amount of time to wait for input on the text block reader
+ private static final long TEXT_BLOCK_DELAY = 250;
+
+ // Tag for the start of a text block
+ private final String blockStartToken;
+
+ // The input stream for text
+ private InputStream inputStream;
+
+ // The lines of input read from the input stream
+ private final Queue<String> textLineQueue = new LinkedBlockingQueue<>();
+
+ // The thread used to read text from the input stream
+ private Thread textConsumputionThread;
+
+ // True while EOF has not been seen on input
+ private boolean eofOnInputStream = false;
+
+ /**
+ * Constructor, initialize the text block reader.
+ *
+ * @param blockStartToken the block start token for the start of a text block
+ */
+ public HeaderDelimitedTextBlockReader(final String blockStartToken) {
+ this.blockStartToken = blockStartToken;
+ }
+
+ /**
+ * Constructor, initialize the text block reader using token delimited event protocol
+ * parameters.
+ *
+ * @param tokenDelimitedParameters the token delimited event protocol parameters
+ */
+ public HeaderDelimitedTextBlockReader(final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters) {
+ this.blockStartToken = tokenDelimitedParameters.getDelimiterToken();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader#
+ * init( java.io.InputStream)
+ */
+ @Override
+ public void init(final InputStream incomingInputStream) {
+ this.inputStream = incomingInputStream;
+
+ // Configure and start the text reading thread
+ textConsumputionThread = new ApplicationThreadFactory(this.getClass().getName()).newThread(this);
+ textConsumputionThread.setDaemon(true);
+ textConsumputionThread.start();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader#
+ * readTextBlock()
+ */
+ @Override
+ public TextBlock readTextBlock() throws IOException {
+ // Holder for the current text block
+ final StringBuilder textBlockBuilder = new StringBuilder();
+
+ // Wait for the timeout period if there is no input
+ if (!eofOnInputStream && textLineQueue.size() == 0) {
+ ThreadUtilities.sleep(TEXT_BLOCK_DELAY);
+ }
+
+ // Scan the lines in the queue
+ while (textLineQueue.size() > 0) {
+ // Scroll down in the available lines looking for the start of the text block
+ if (textLineQueue.peek().startsWith(blockStartToken)) {
+ // Process the input line header
+ textBlockBuilder.append(textLineQueue.remove());
+ textBlockBuilder.append('\n');
+ break;
+ } else {
+ LOGGER.warn("invalid input on consumer: " + textLineQueue.remove());
+ }
+ }
+
+ // Get the rest of the text document
+ while (textLineQueue.size() > 0 && !textLineQueue.peek().startsWith(blockStartToken)) {
+ textBlockBuilder.append(textLineQueue.remove());
+ textBlockBuilder.append('\n');
+ }
+
+ // Condition the text block and return it
+ final String textBlock = textBlockBuilder.toString().trim();
+ final boolean endOfText = (eofOnInputStream && textLineQueue.size() == 0 ? true : false);
+
+ if (textBlock.length() > 0) {
+ return new TextBlock(endOfText, textBlock);
+ } else {
+ return new TextBlock(endOfText, null);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ final BufferedReader textReader = new BufferedReader(new InputStreamReader(inputStream));
+
+ try {
+ // Read the input line by line until we see end of file on the stream
+ String line;
+ while ((line = textReader.readLine()) != null) {
+ textLineQueue.add(line);
+ }
+ } catch (final IOException e) {
+ LOGGER.warn("I/O exception on text input on consumer: ", e);
+ } finally {
+ eofOnInputStream = true;
+ }
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlock.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlock.java
new file mode 100644
index 000000000..526d9c318
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlock.java
@@ -0,0 +1,78 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
+
+/**
+ * This class is a bean that holds a block of text read from an incoming text file.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class TextBlock {
+ private boolean endOfText = false;
+ private String text;
+
+ /**
+ * Constructor to initiate the text block.
+ *
+ * @param endOfText the end of text
+ * @param text the text
+ */
+ public TextBlock(final boolean endOfText, final String text) {
+ this.endOfText = endOfText;
+ this.text = text;
+ }
+
+ /**
+ * Checks if is end of text.
+ *
+ * @return true, if checks if is end of text
+ */
+ public boolean isEndOfText() {
+ return endOfText;
+ }
+
+ /**
+ * Sets whether end of text has been reached.
+ *
+ * @param endOfText the end of text flag value
+ */
+ public void setEndOfText(final boolean endOfText) {
+ this.endOfText = endOfText;
+ }
+
+ /**
+ * Gets the text of the text block.
+ *
+ * @return the text of the text block
+ */
+ public String getText() {
+ return text;
+ }
+
+ /**
+ * Sets the text of the text block.
+ *
+ * @param text the text of the text block
+ */
+ public void setText(final String text) {
+ this.text = text;
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReader.java
new file mode 100644
index 000000000..627718402
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReader.java
@@ -0,0 +1,46 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implementers of the interface TextBlockReader read the next block of text from an input stream.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public interface TextBlockReader {
+ /**
+ * Initialize the text block reader reader.
+ *
+ * @param inputStream The stream to read from
+ */
+ void init(InputStream inputStream);
+
+ /**
+ * Read a block of text between two delimiters.
+ *
+ * @return The text block
+ * @throws IOException On reading errors
+ */
+ TextBlock readTextBlock() throws IOException;
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReaderFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReaderFactory.java
new file mode 100644
index 000000000..e48266634
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReaderFactory.java
@@ -0,0 +1,80 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
+
+import java.io.InputStream;
+
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters;
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextCharDelimitedParameters;
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+/**
+ * This factory creates text block readers for breaking character streams into blocks of text.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class TextBlockReaderFactory {
+ // The logger for this class
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(TextBlockReaderFactory.class);
+
+ /**
+ * Get a text block reader for the given event protocol.
+ *
+ * @param inputStream the input stream that will be used for reading
+ * @param eventProtocolParameters the parameters that have been specified for event protocols
+ * @return the tagged reader
+ * @throws ApexEventException On an unsupported event protocol
+ */
+ public TextBlockReader getTaggedReader(final InputStream inputStream,
+ final EventProtocolParameters eventProtocolParameters) throws ApexEventException {
+ // Check the type of event protocol we have
+ if (eventProtocolParameters instanceof EventProtocolTextCharDelimitedParameters) {
+ // We have character delimited textual input
+ final EventProtocolTextCharDelimitedParameters charDelimitedParameters =
+ (EventProtocolTextCharDelimitedParameters) eventProtocolParameters;
+
+ // Create the text block reader
+ final TextBlockReader characterDelimitedTextBlockReader =
+ new CharacterDelimitedTextBlockReader(charDelimitedParameters);
+ characterDelimitedTextBlockReader.init(inputStream);
+ return characterDelimitedTextBlockReader;
+ } else if (eventProtocolParameters instanceof EventProtocolTextTokenDelimitedParameters) {
+ // We have token delimited textual input
+ final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters =
+ (EventProtocolTextTokenDelimitedParameters) eventProtocolParameters;
+
+ // Create the text block reader
+ final HeaderDelimitedTextBlockReader headerDelimitedTextBlockReader =
+ new HeaderDelimitedTextBlockReader(tokenDelimitedParameters);
+ headerDelimitedTextBlockReader.init(inputStream);
+ return headerDelimitedTextBlockReader;
+ } else {
+ final String errorMessage =
+ "could not create text block reader for a textual event protocol, the required type "
+ + eventProtocolParameters.getLabel() + " is not supported";
+ LOGGER.error(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/package-info.java
new file mode 100644
index 000000000..05833ac7c
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Implements the FILE carrier technology consumer that sends events to APEX from files, standard IO
+ * or named pipes.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer;
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/package-info.java
new file mode 100644
index 000000000..de0b1b56e
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Implements the FILE carrier technology for event input and output to and from Apex using files,
+ * named pipes, and standard IO.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin;
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java
new file mode 100644
index 000000000..d5f9ff1b2
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java
@@ -0,0 +1,181 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.producer;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventProducer;
+import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
+import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FILECarrierTechnologyParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Concrete implementation of an Apex event producer that sends events to a file.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ApexFileEventProducer implements ApexEventProducer {
+ // Get a reference to the logger
+ private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventProducer.class);
+
+ // The name for this producer
+ private String producerName = null;
+
+ // The output stream to write events to
+ private PrintStream eventOutputStream;
+
+ // The peer references for this event handler
+ private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap =
+ new EnumMap<>(EventHandlerPeeredMode.class);
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#init()
+ */
+ @Override
+ public void init(final String name, final EventHandlerParameters producerParameters) throws ApexEventException {
+ producerName = name;
+
+ // Get and check the Apex parameters from the parameter service
+ if (producerParameters == null) {
+ final String errorMessage = "Producer parameters for ApexFileProducer \"" + producerName + "\" is null";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+
+ // Check and get the file Properties
+ if (!(producerParameters.getCarrierTechnologyParameters() instanceof FILECarrierTechnologyParameters)) {
+ final String errorMessage = "specified producer properties for ApexFileProducer \"" + producerName
+ + "\" are not applicable to a FILE producer";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+ final FILECarrierTechnologyParameters fileCarrierTechnologyParameters =
+ (FILECarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
+
+ // Now we create a writer for events
+ try {
+ if (fileCarrierTechnologyParameters.isStandardError()) {
+ eventOutputStream = System.err;
+ } else if (fileCarrierTechnologyParameters.isStandardIO()) {
+ eventOutputStream = System.out;
+ } else {
+ eventOutputStream =
+ new PrintStream(new FileOutputStream(fileCarrierTechnologyParameters.getFileName()), true);
+ }
+ } catch (final IOException e) {
+ final String errorMessage = "ApexFileProducer \"" + producerName + "\" failed to open file for writing: \""
+ + fileCarrierTechnologyParameters.getFileName() + "\"";
+ LOGGER.warn(errorMessage, e);
+ throw new ApexEventException(errorMessage, e);
+ }
+
+ if (fileCarrierTechnologyParameters.getStartDelay() > 0) {
+ ThreadUtilities.sleep(fileCarrierTechnologyParameters.getStartDelay());
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
+ */
+ @Override
+ public String getName() {
+ return producerName;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.
+ * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode)
+ */
+ @Override
+ public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
+ return peerReferenceMap.get(peeredMode);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.
+ * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode,
+ * org.onap.policy.apex.service.engine.event.PeeredReference)
+ */
+ @Override
+ public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
+ peerReferenceMap.put(peeredMode, peeredReference);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long,
+ * java.lang.String, java.lang.Object)
+ */
+ @Override
+ public void sendEvent(final long executionId, final String eventName, final Object event) {
+ // Check if this is a synchronized event, if so we have received a reply
+ final SynchronousEventCache synchronousEventCache =
+ (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
+ if (synchronousEventCache != null) {
+ synchronousEventCache.removeCachedEventToApexIfExists(executionId);
+ }
+
+ // Cast the event to a string, if our conversion is correctly configured, this cast should
+ // always work
+ String stringEvent = null;
+ try {
+ stringEvent = (String) event;
+ } catch (final Exception e) {
+ final String errorMessage = "error in ApexFileProducer \"" + producerName + "\" while transferring event \""
+ + event + "\" to the output stream";
+ LOGGER.debug(errorMessage, e);
+ throw new ApexEventRuntimeException(errorMessage, e);
+ }
+
+ eventOutputStream.println(stringEvent);
+ eventOutputStream.flush();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
+ */
+ @Override
+ public void stop() {
+ eventOutputStream.close();
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/package-info.java
new file mode 100644
index 000000000..f7d7cbfbd
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Implements the FILE carrier technology producer that outputs events from APEX to files, standard
+ * IO or named pipes.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.producer;
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java
new file mode 100644
index 000000000..3b21a29ca
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java
@@ -0,0 +1,433 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.jsonprotocolplugin;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.policy.apex.context.SchemaHelper;
+import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
+import org.onap.policy.apex.model.basicmodel.service.ModelService;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvent;
+import org.onap.policy.apex.model.eventmodel.concepts.AxEvents;
+import org.onap.policy.apex.model.eventmodel.concepts.AxField;
+import org.onap.policy.apex.service.engine.event.ApexEvent;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter;
+import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+/**
+ * The Class Apex2JSONEventConverter converts {@link ApexEvent} instances to and from JSON string
+ * representations of Apex events.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class Apex2JSONEventConverter implements ApexEventProtocolConverter {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(Apex2JSONEventConverter.class);
+
+ // The parameters for the JSON event protocol
+ private JSONEventProtocolParameters jsonPars;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter#init(org.onap.policy.
+ * apex.service.parameters.eventprotocol.EventProtocolParameters)
+ */
+ @Override
+ public void init(final EventProtocolParameters parameters) {
+ // Check and get the JSON parameters
+ if (!(parameters instanceof JSONEventProtocolParameters)) {
+ final String errorMessage = "specified consumer properties are not applicable to the JSON event protocol";
+ LOGGER.warn(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+
+ jsonPars = (JSONEventProtocolParameters) parameters;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String,
+ * java.lang.Object)
+ */
+ @Override
+ public List<ApexEvent> toApexEvent(final String eventName, final Object eventObject) throws ApexEventException {
+ // Check the event eventObject
+ if (eventObject == null) {
+ LOGGER.warn("event processing failed, event is null");
+ throw new ApexEventException("event processing failed, event is null");
+ }
+
+ // Cast the event to a string, if our conversion is correctly configured, this cast should
+ // always work
+ String jsonEventString = null;
+ try {
+ jsonEventString = (String) eventObject;
+ } catch (final Exception e) {
+ final String errorMessage = "error converting event \"" + eventObject + "\" to a string";
+ LOGGER.debug(errorMessage, e);
+ throw new ApexEventRuntimeException(errorMessage, e);
+ }
+
+ // The list of events we will return
+ final List<ApexEvent> eventList = new ArrayList<ApexEvent>();
+
+ try {
+ // We may have a single JSON object with a single event or an array of JSON objects
+ final Object decodedJsonObject =
+ new GsonBuilder().serializeNulls().create().fromJson(jsonEventString, Object.class);
+
+ // Check if we have a list of objects
+ if (decodedJsonObject instanceof List) {
+ // Check if it's a list of JSON objects or a list of strings
+ @SuppressWarnings("unchecked")
+ final List<Object> decodedJsonList = (List<Object>) decodedJsonObject;
+
+ // Decode each of the list elements in sequence
+ for (final Object jsonListObject : decodedJsonList) {
+ if (jsonListObject instanceof String) {
+ eventList.add(jsonStringApexEvent(eventName, (String) jsonListObject));
+ } else if (jsonListObject instanceof JsonObject) {
+ eventList.add(jsonObject2ApexEvent(eventName, (JsonObject) jsonListObject));
+ } else {
+ throw new ApexEventException("incoming event (" + jsonEventString
+ + ") is a JSON object array containing an invalid object " + jsonListObject);
+ }
+ }
+ } else {
+ eventList.add(jsonStringApexEvent(eventName, jsonEventString));
+ }
+ } catch (final Exception e) {
+ final String errorString =
+ "Failed to unmarshal JSON event: " + e.getMessage() + ", event=" + jsonEventString;
+ LOGGER.warn(errorString, e);
+ throw new ApexEventException(errorString, e);
+ }
+
+ // Return the list of events we have unmarshalled
+ return eventList;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy.
+ * apex.service.engine.event.ApexEvent)
+ */
+ @Override
+ public Object fromApexEvent(final ApexEvent apexEvent) throws ApexEventException {
+ // Check the Apex event
+ if (apexEvent == null) {
+ LOGGER.warn("event processing failed, Apex event is null");
+ throw new ApexEventException("event processing failed, Apex event is null");
+ }
+
+ // Get the event definition for the event from the model service
+ final AxEvent eventDefinition =
+ ModelService.getModel(AxEvents.class).get(apexEvent.getName(), apexEvent.getVersion());
+
+ // Use a GSON Json object to marshal the Apex event to JSON
+ final Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();
+ final JsonObject jsonObject = new JsonObject();
+
+ jsonObject.addProperty(ApexEvent.NAME_HEADER_FIELD, apexEvent.getName());
+ jsonObject.addProperty(ApexEvent.VERSION_HEADER_FIELD, apexEvent.getVersion());
+ jsonObject.addProperty(ApexEvent.NAMESPACE_HEADER_FIELD, apexEvent.getNameSpace());
+ jsonObject.addProperty(ApexEvent.SOURCE_HEADER_FIELD, apexEvent.getSource());
+ jsonObject.addProperty(ApexEvent.TARGET_HEADER_FIELD, apexEvent.getTarget());
+
+ if (apexEvent.getExceptionMessage() != null) {
+ jsonObject.addProperty(ApexEvent.EXCEPTION_MESSAGE_HEADER_FIELD, apexEvent.getExceptionMessage());
+ }
+
+ for (final AxField eventField : eventDefinition.getFields()) {
+ final String fieldName = eventField.getKey().getLocalName();
+
+ if (!apexEvent.containsKey(fieldName)) {
+ if (!eventField.getOptional()) {
+ final String errorMessage = "error parsing " + eventDefinition.getID() + " event to Json. "
+ + "Field \"" + fieldName + "\" is missing, but is mandatory. Fields: " + apexEvent;
+ LOGGER.debug(errorMessage);
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+ continue;
+ }
+
+ final Object fieldValue = apexEvent.get(fieldName);
+
+ // Get the schema helper
+ final SchemaHelper fieldSchemaHelper =
+ new SchemaHelperFactory().createSchemaHelper(eventField.getKey(), eventField.getSchema());
+ jsonObject.add(fieldName, fieldSchemaHelper.marshal2JsonElement(fieldValue));
+ }
+
+ // Output JSON string in a pretty format
+ return gson.toJson(jsonObject);
+ }
+
+ /**
+ * This method converts a JSON object into an Apex event.
+ *
+ * @param eventName the name of the event
+ * @param jsonEventString the JSON string that holds the event
+ * @return the apex event that we have converted the JSON object into
+ * @throws ApexEventException thrown on unmarshaling exceptions
+ */
+ private ApexEvent jsonStringApexEvent(final String eventName, final String jsonEventString)
+ throws ApexEventException {
+ // Use GSON to read the event string
+ final JsonObject jsonObject =
+ new GsonBuilder().serializeNulls().create().fromJson(jsonEventString, JsonObject.class);
+
+ if (jsonObject == null || !jsonObject.isJsonObject()) {
+ throw new ApexEventException(
+ "incoming event (" + jsonEventString + ") is not a JSON object or an JSON object array");
+ }
+
+ return jsonObject2ApexEvent(eventName, jsonObject);
+ }
+
+ /**
+ * This method converts a JSON object into an Apex event.
+ *
+ * @param eventName the name of the event
+ * @param jsonObject the JSON object that holds the event
+ * @return the apex event that we have converted the JSON object into
+ * @throws ApexEventException thrown on unmarshaling exceptions
+ */
+ private ApexEvent jsonObject2ApexEvent(final String eventName, final JsonObject jsonObject)
+ throws ApexEventException {
+ // Process the mandatory Apex header
+ final ApexEvent apexEvent = processApexEventHeader(eventName, jsonObject);
+
+ // Get the event definition for the event from the model service
+ final AxEvent eventDefinition =
+ ModelService.getModel(AxEvents.class).get(apexEvent.getName(), apexEvent.getVersion());
+
+ // Iterate over the input fields in the event
+ for (final AxField eventField : eventDefinition.getFields()) {
+ final String fieldName = eventField.getKey().getLocalName();
+ if (!hasJSONField(jsonObject, fieldName)) {
+ if (!eventField.getOptional()) {
+ final String errorMessage = "error parsing " + eventDefinition.getID() + " event from Json. "
+ + "Field \"" + fieldName + "\" is missing, but is mandatory.";
+ LOGGER.debug(errorMessage);
+ throw new ApexEventException(errorMessage);
+ }
+ continue;
+ }
+
+ final JsonElement fieldValue = getJSONField(jsonObject, fieldName, null, !eventField.getOptional());
+
+ if (fieldValue != null && !fieldValue.isJsonNull()) {
+ // Get the schema helper
+ final SchemaHelper fieldSchemaHelper =
+ new SchemaHelperFactory().createSchemaHelper(eventField.getKey(), eventField.getSchema());
+ apexEvent.put(fieldName, fieldSchemaHelper.createNewInstance(fieldValue));
+ } else {
+ apexEvent.put(fieldName, null);
+ }
+ }
+ return apexEvent;
+
+ }
+
+ /**
+ * This method processes the event header of an Apex event.
+ *
+ * @param eventName the name of the event
+ * @param jsonObject the JSON object containing the JSON representation of the incoming event
+ * @return an apex event constructed using the header fields of the event
+ * @throws ApexEventRuntimeException the apex event runtime exception
+ * @throws ApexEventException on invalid events with missing header fields
+ */
+ private ApexEvent processApexEventHeader(final String eventName, final JsonObject jsonObject)
+ throws ApexEventRuntimeException, ApexEventException {
+ // Get the event header fields
+ // @formatter:off
+ String name = getJSONStringField(jsonObject, ApexEvent.NAME_HEADER_FIELD, jsonPars.getNameAlias(), ApexEvent.NAME_REGEXP, false);
+ String version = getJSONStringField(jsonObject, ApexEvent.VERSION_HEADER_FIELD, jsonPars.getVersionAlias(), ApexEvent.VERSION_REGEXP, false);
+ String namespace = getJSONStringField(jsonObject, ApexEvent.NAMESPACE_HEADER_FIELD, jsonPars.getNameSpaceAlias(), ApexEvent.NAMESPACE_REGEXP, false);
+ String source = getJSONStringField(jsonObject, ApexEvent.SOURCE_HEADER_FIELD, jsonPars.getSourceAlias(), ApexEvent.SOURCE_REGEXP, false);
+ String target = getJSONStringField(jsonObject, ApexEvent.TARGET_HEADER_FIELD, jsonPars.getTargetAlias(), ApexEvent.TARGET_REGEXP, false);
+ // @formatter:on
+
+ // Check if an event name was specified on the event parameters
+ if (eventName != null) {
+ if (name != null && !eventName.equals(name)) {
+ LOGGER.warn("The incoming event name \"" + name + "\" does not match the configured event name \""
+ + eventName + "\", using configured event name");
+ }
+ name = eventName;
+ } else {
+ if (name == null) {
+ throw new ApexEventRuntimeException(
+ "event received without mandatory parameter \"name\" on configuration or on event");
+ }
+ }
+
+ // Now, find the event definition in the model service. If version is null, the newest event
+ // definition in the model service is used
+ final AxEvent eventDefinition = ModelService.getModel(AxEvents.class).get(name, version);
+ if (eventDefinition == null) {
+ if (version == null) {
+ throw new ApexEventRuntimeException(
+ "an event definition for an event named \"" + name + "\" not found in Apex model");
+ } else {
+ throw new ApexEventRuntimeException("an event definition for an event named \"" + name
+ + "\" with version \"" + version + "\" not found in Apex model");
+ }
+ }
+
+ // Use the defined event version if no version is specified on the incoming fields
+ if (version == null) {
+ version = eventDefinition.getKey().getVersion();
+ }
+
+ // Check the name space is OK if it is defined, if not, use the name space from the model
+ if (namespace != null) {
+ if (!namespace.equals(eventDefinition.getNameSpace())) {
+ throw new ApexEventRuntimeException(
+ "namespace \"" + namespace + "\" on event \"" + name + "\" does not match namespace \""
+ + eventDefinition.getNameSpace() + "\" for that event in the Apex model");
+ }
+ } else {
+ namespace = eventDefinition.getNameSpace();
+ }
+
+ // For source, use the defined source only if the source is not found on the incoming event
+ if (source == null) {
+ source = eventDefinition.getSource();
+ }
+
+ // For target, use the defined source only if the source is not found on the incoming event
+ if (target == null) {
+ target = eventDefinition.getTarget();
+ }
+
+ return new ApexEvent(name, version, namespace, source, target);
+ }
+
+ /**
+ * This method gets an event string field from a JSON object.
+ *
+ * @param jsonObject the JSON object containing the JSON representation of the incoming event
+ * @param fieldName the field name to find in the event
+ * @param fieldAlias the alias for the field to find in the event, overrides the field name if
+ * it is not null
+ * @param fieldRE the regular expression to check the field against for validity
+ * @param mandatory true if the field is mandatory
+ * @return the value of the field in the JSON object or null if the field is optional
+ * @throws ApexEventRuntimeException the apex event runtime exception
+ */
+ private String getJSONStringField(final JsonObject jsonObject, final String fieldName, final String fieldAlias,
+ final String fieldRE, final boolean mandatory) throws ApexEventRuntimeException {
+ // Get the JSON field for the string field
+ final JsonElement jsonField = getJSONField(jsonObject, fieldName, fieldAlias, mandatory);
+
+ // Null strings are allowed
+ if (jsonField == null || jsonField.isJsonNull()) {
+ return null;
+ }
+
+ // Check if this is a string field
+ String fieldValueString = null;
+ try {
+ fieldValueString = jsonField.getAsString();
+ } catch (final Exception e) {
+ // The element is not a string so throw an error
+ throw new ApexEventRuntimeException("field \"" + fieldName + "\" with type \""
+ + jsonField.getClass().getCanonicalName() + "\" is not a string value");
+ }
+
+ // Is regular expression checking required
+ if (fieldRE == null) {
+ return fieldValueString;
+ }
+
+ // Check the event field against its regular expression
+ if (!fieldValueString.matches(fieldRE)) {
+ throw new ApexEventRuntimeException(
+ "field \"" + fieldName + "\" with value \"" + fieldValueString + "\" is invalid");
+ }
+
+ return fieldValueString;
+ }
+
+ /**
+ * This method gets an event field from a JSON object.
+ *
+ * @param jsonObject the JSON object containing the JSON representation of the incoming event
+ * @param fieldName the field name to find in the event
+ * @param fieldAlias the alias for the field to find in the event, overrides the field name if
+ * it is not null
+ * @param mandatory true if the field is mandatory
+ * @return the value of the field in the JSON object or null if the field is optional
+ * @throws ApexEventRuntimeException the apex event runtime exception
+ */
+ private JsonElement getJSONField(final JsonObject jsonObject, final String fieldName, final String fieldAlias,
+ final boolean mandatory) throws ApexEventRuntimeException {
+
+ // Check if we should use the alias for this field
+ String fieldToFind = fieldName;
+ if (fieldAlias != null) {
+ fieldToFind = fieldAlias;
+ }
+
+ // Get the event field
+ final JsonElement eventElement = jsonObject.get(fieldToFind);
+ if (eventElement == null) {
+ if (!mandatory) {
+ return null;
+ } else {
+ throw new ApexEventRuntimeException("mandatory field \"" + fieldToFind + "\" is missing");
+ }
+ }
+
+ return eventElement;
+ }
+
+ /**
+ * This method if a JSON object has a named field.
+ *
+ * @param jsonObject the JSON object containing the JSON representation of the incoming event
+ * @param fieldName the field name to find in the event
+ * @return true if the field is present
+ * @throws ApexEventRuntimeException the apex event runtime exception
+ */
+ private boolean hasJSONField(final JsonObject jsonObject, final String fieldName) throws ApexEventRuntimeException {
+ // check for the field
+ return jsonObject.has(fieldName);
+ }
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java
new file mode 100644
index 000000000..b4a4055d7
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java
@@ -0,0 +1,135 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.service.engine.event.impl.jsonprotocolplugin;
+
+import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextCharDelimitedParameters;
+
+/**
+ * Event protocol parameters for JSON as an event protocol.
+ *
+ * The parameters for this plugin are:
+ * <ol>
+ * <li>nameAlias: The field in a JSON event to use as an alias for the event name. This parameter is
+ * optional.
+ * <li>versionAlias: The field in a JSON event to use as an alias for the event version. This
+ * parameter is optional.
+ * <li>nameSpaceAlias: The field in a JSON event to use as an alias for the event name space. This
+ * parameter is optional.
+ * <li>sourceAlias: The field in a JSON event to use as an alias for the event source. This
+ * parameter is optional.
+ * <li>targetAlias: The field in a JSON event to use as an alias for the event target. This
+ * parameter is optional.
+ * </ol>
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class JSONEventProtocolParameters extends EventProtocolTextCharDelimitedParameters {
+ /** The label of this event protocol. */
+ public static final String JSON_EVENT_PROTOCOL_LABEL = "JSON";
+
+ // Constants for text block delimiters
+ private static final char JSON_TEXT_BLOCK_START_DELIMITER = '{';
+ private static final char JSON_TEXT_BLOCK_END_DELIMITER = '}';
+
+ // Aliases for Apex event header fields
+ // @formatter:off
+ private final String nameAlias = null;
+ private final String versionAlias = null;
+ private final String nameSpaceAlias = null;
+ private final String sourceAlias = null;
+ private final String targetAlias = null;
+ // @formatter:on
+
+ /**
+ * Constructor to create a JSON event protocol parameter instance and register the instance with
+ * the parameter service.
+ */
+ public JSONEventProtocolParameters() {
+ this(JSONEventProtocolParameters.class.getCanonicalName(), JSON_EVENT_PROTOCOL_LABEL);
+ }
+
+ /**
+ * Constructor to create an event protocol parameters instance with the name of a sub class of
+ * this class.
+ *
+ * @param parameterClassName the class name of a sub class of this class
+ * @param eventProtocolLabel the name of the event protocol for this plugin
+ */
+ public JSONEventProtocolParameters(final String parameterClassName, final String eventProtocolLabel) {
+ super(parameterClassName);
+
+ // Set the event protocol properties for the JSON event protocol
+ this.setLabel(eventProtocolLabel);
+
+ // Set the starting and ending delimiters for text blocks of JSON events
+ this.setStartChar(JSON_TEXT_BLOCK_START_DELIMITER);
+ this.setEndChar(JSON_TEXT_BLOCK_END_DELIMITER);
+
+ // Set the event protocol plugin class
+ this.setEventProtocolPluginClass(Apex2JSONEventConverter.class.getCanonicalName());
+ }
+
+ /**
+ * Gets the name alias.
+ *
+ * @return the name alias
+ */
+ public String getNameAlias() {
+ return nameAlias;
+ }
+
+ /**
+ * Gets the version alias.
+ *
+ * @return the version alias
+ */
+ public String getVersionAlias() {
+ return versionAlias;
+ }
+
+ /**
+ * Gets the name space alias.
+ *
+ * @return the name space alias
+ */
+ public String getNameSpaceAlias() {
+ return nameSpaceAlias;
+ }
+
+ /**
+ * Gets the source alias.
+ *
+ * @return the source alias
+ */
+ public String getSourceAlias() {
+ return sourceAlias;
+ }
+
+ /**
+ * Gets the target alias.
+ *
+ * @return the target alias
+ */
+ public String getTargetAlias() {
+ return targetAlias;
+ }
+
+}
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/package-info.java
new file mode 100644
index 000000000..65f4831ec
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Contains the implementation of the APEX event protocol cinverter plugin for events in Json
+ * format.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+package org.onap.policy.apex.service.engine.event.impl.jsonprotocolplugin;
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/package-info.java
new file mode 100644
index 000000000..ae2d58701
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/package-info.java
@@ -0,0 +1,30 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Contains implementations for conversion between externally facing
+ * {@link org.onap.policy.apex.service.engine.event.ApexEvent} instances and internal APEX engine
+ * {@link org.onap.policy.apex.core.engine.event.EnEvent} instances. It also contains the
+ * implementation of the default APEX File carrier technology plugin as well as the protocol plugins
+ * for XML and JSON event protocols.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+package org.onap.policy.apex.service.engine.event.impl;
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/package-info.java
new file mode 100644
index 000000000..75404137e
--- /dev/null
+++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/package-info.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Provides a generic externally-facing {@link ApexEvent} class that can be sent into an APEX engine
+ * and processed by an APEX engine. It provides the producer {@link ApexEventProducer} producer and
+ * {@link ApexEventConsumer} consumer interfaces that APEX uses to send events to and receive events
+ * from other systems. It also provides the {@link ApexEventConverter} interface that can be
+ * implemented by plugins that wish to convert some external event format into the APEX event
+ * format. It also provides a periodic event generator that can be used to send periodic events into
+ * an APEX engine for triggering of policies to carry out housekeeping tasks.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+package org.onap.policy.apex.service.engine.event;